From b74c6012092e47430a8f4d6f4ddeb1d3e2b1b7df Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 19 May 2017 17:21:42 +0200 Subject: [PATCH] BUG-8403: guard against ConcurrentModificationException Using TransmitQueue.asIterable() offers slight advantage of not dealing with a big list, but exposes us to the risk of the Iterable being changed. The point missed by the fix to BUG 8491 is that there is an avenue for the old connection to be touched during replay, as we are completing entries, for example reads when we are switching from remote to local connection. In this case the callback will be invoked in the actor thread, with all the locks being reentrant and held, hence it can break through to the old connection's queue. If that happens we will see a ConcurrentModificationException and enter a buggy territory, where the client fails to work properly. Document this caveat and turn asIterable() into drain(), which removes all the entries in the queue, allowing new entries to be enqueued. The late-comer entries are accounted for when we set the forwarder. Change-Id: Idf29c1e565e12aaed917ac94c21c552daf169d4d Signed-off-by: Robert Varga (cherry picked from commit 930747a6ba5d888d2fbe54473132680e4621d858) --- .../client/AbstractClientConnection.java | 18 ++----- .../access/client/ClientActorBehavior.java | 5 +- .../cluster/access/client/TransmitQueue.java | 47 +++++++++++++++---- .../client/AbstractTransmitQueueTest.java | 18 +++---- .../client/TransmittingTransmitQueueTest.java | 7 ++- .../actors/dds/AbstractClientHistory.java | 3 +- .../actors/dds/HistoryReconnectCohort.java | 3 +- .../databroker/actors/dds/ProxyHistory.java | 3 +- .../actors/dds/ProxyReconnectCohort.java | 3 +- 9 files changed, 67 insertions(+), 40 deletions(-) diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 47c0676979..da016bae88 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -13,7 +13,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.Iterator; +import java.util.Collection; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -156,28 +156,20 @@ public abstract class AbstractClientConnection { public abstract Optional getBackendInfo(); - final Iterable startReplay() { + final Collection startReplay() { lock.lock(); - return queue.asIterable(); + return queue.drain(); } @GuardedBy("lock") final void finishReplay(final ReconnectForwarder forwarder) { - queue.setForwarder(forwarder); + setForwarder(forwarder); lock.unlock(); } @GuardedBy("lock") final void setForwarder(final ReconnectForwarder forwarder) { - final long now = currentTime(); - final Iterator it = queue.asIterable().iterator(); - while (it.hasNext()) { - final ConnectionEntry e = it.next(); - forwarder.forwardEntry(e, now); - it.remove(); - } - - queue.setForwarder(forwarder); + queue.setForwarder(forwarder, currentTime()); } @GuardedBy("lock") diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java index ca78d0cb66..5d61885cd5 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -54,7 +55,7 @@ public abstract class ClientActorBehavior extends * @param enqueuedEntries Previously-enqueued entries * @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns. */ - @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable enqueuedEntries); + @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection enqueuedEntries); } private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class); @@ -286,7 +287,7 @@ public abstract class ClientActorBehavior extends final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn)); // Lock the old connection and get a reference to its entries - final Iterable replayIterable = conn.startReplay(); + final Collection replayIterable = conn.startReplay(); // Finish the connection attempt final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable)); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 28902d28ba..b7543410cd 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -11,9 +11,9 @@ import akka.actor.ActorRef; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Deque; import java.util.Iterator; import java.util.Optional; @@ -106,8 +106,20 @@ abstract class TransmitQueue { tracker = new AveragingProgressTracker(targetDepth); } - final Iterable asIterable() { - return Iterables.concat(inflight, pending); + /** + * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries + * to be added to it during replay. When we set the successor all entries enqueued between when this methods + * returns and the successor is set will be replayed to the successor. + * + * @return Collection of entries present in the queue. + */ + final Collection drain() { + final Collection ret = new ArrayDeque<>(inflight.size() + pending.size()); + ret.addAll(inflight); + ret.addAll(pending); + inflight.clear(); + pending.clear(); + return ret; } final long ticksStalling(final long now) { @@ -232,13 +244,32 @@ abstract class TransmitQueue { poisonQueue(pending, cause); } - final void setForwarder(final ReconnectForwarder forwarder) { + final void setForwarder(final ReconnectForwarder forwarder, final long now) { Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this); - Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight); - Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending); - successor = Preconditions.checkNotNull(forwarder); - LOG.debug("Connection {} superseded by {}", this, successor); + LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); + + /* + * We need to account for entries which have been added between the time drain() was called and this method + * is invoked. Since the old connection is visible during replay and some entries may have completed on the + * replay thread, there was an avenue for this to happen. + */ + int count = 0; + ConnectionEntry entry = inflight.poll(); + while (entry != null) { + successor.forwardEntry(entry, now); + entry = inflight.poll(); + count++; + } + + entry = pending.poll(); + while (entry != null) { + successor.forwardEntry(entry, now); + entry = pending.poll(); + count++; + } + + LOG.debug("Connection {} queue spliced {} messages", this, count); } final void remove(final long now) { diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java index 8e07804633..b5f1bdac7e 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java @@ -17,7 +17,7 @@ import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; import com.google.common.base.Ticker; -import com.google.common.collect.Iterables; +import java.util.Collection; import java.util.Optional; import java.util.function.Consumer; import org.junit.After; @@ -74,15 +74,15 @@ public abstract class AbstractTransmitQueueTest { @Test public void testAsIterable() throws Exception { - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); final int sentMessages = getMaxInFlightMessages() + 1; for (int i = 0; i < sentMessages; i++) { queue.enqueue(new ConnectionEntry(request, callback, now), now); } - final Iterable entries = queue.asIterable(); - Assert.assertEquals(sentMessages, Iterables.size(entries)); + final Collection entries = queue.drain(); + Assert.assertEquals(sentMessages, entries.size()); Assert.assertThat(entries, everyItem(entryWithRequest(request))); } @@ -97,7 +97,7 @@ public abstract class AbstractTransmitQueueTest { final long requestSequence = 0L; final long txSequence = 0L; final long sessionId = 0L; - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); queue.enqueue(new ConnectionEntry(request, callback, now), now); @@ -133,7 +133,7 @@ public abstract class AbstractTransmitQueueTest { @Test public void testIsEmpty() throws Exception { Assert.assertTrue(queue.isEmpty()); - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); queue.enqueue(new ConnectionEntry(request, callback, now), now); @@ -142,8 +142,8 @@ public abstract class AbstractTransmitQueueTest { @Test public void testPeek() throws Exception { - final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); - final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now); @@ -155,7 +155,7 @@ public abstract class AbstractTransmitQueueTest { @Test public void testPoison() throws Exception { - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); queue.enqueue(new ConnectionEntry(request, callback, now), now); diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index 752c12771a..6dd38ce213 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -19,7 +19,6 @@ import static org.opendaylight.controller.cluster.access.client.ConnectionEntryM import com.google.common.base.Ticker; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.testing.FakeTicker; import java.util.Arrays; import java.util.Collection; @@ -111,8 +110,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest entries = queue.asIterable(); - assertEquals(sentMessages, Iterables.size(entries)); + final Collection entries = queue.drain(); + assertEquals(sentMessages, entries.size()); assertThat(entries, everyItem(entryWithRequest(request))); } @@ -143,7 +142,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest> callback = createConsumerMock(); final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read()); final ReconnectForwarder forwarder = mock(ReconnectForwarder.class); - queue.setForwarder(forwarder); + queue.setForwarder(forwarder, ticker.read()); final long secondEnqueueNow = ticker.read(); queue.enqueue(entry, secondEnqueueNow); verify(forwarder).forwardEntry(entry, secondEnqueueNow); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index dae12af731..1e8d03a8ec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -306,7 +307,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } @Override - void replayRequests(final Iterable previousEntries) { + void replayRequests(final Collection previousEntries) { proxy.replayRequests(previousEntries); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java index f1939dc846..b5fcca64ce 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/HistoryReconnectCohort.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import java.util.Collection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; /** @@ -18,7 +19,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry; abstract class HistoryReconnectCohort implements AutoCloseable { abstract ProxyReconnectCohort getProxy(); - abstract void replayRequests(Iterable previousEntries); + abstract void replayRequests(Collection previousEntries); @Override public abstract void close(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 802d9ed0b3..7c3b2010c2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -11,6 +11,7 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -212,7 +213,7 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replayRequests(final Iterable previousEntries) { + void replayRequests(final Collection previousEntries) { // First look for our Create message Iterator it = previousEntries.iterator(); while (it.hasNext()) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java index 0ad4dc5336..14bf0645f7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import java.util.Collection; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; @@ -18,7 +19,7 @@ import org.opendaylight.yangtools.concepts.Identifiable; abstract class ProxyReconnectCohort implements Identifiable { - abstract void replayRequests(Iterable previousEntries); + abstract void replayRequests(Collection previousEntries); abstract ProxyHistory finishReconnect(); -- 2.36.6