Using TransmitQueue.asIterable() offers slight advantage of not
dealing with a big list, but exposes us to the risk of the Iterable
being changed.
The point missed by the fix to BUG 8491 is that there is an avenue
for the old connection to be touched during replay, as we are
completing entries, for example reads when we are switching from
remote to local connection. In this case the callback will be invoked
in the actor thread, with all the locks being reentrant and held,
hence it can break through to the old connection's queue.
If that happens we will see a ConcurrentModificationException and
enter a buggy territory, where the client fails to work properly.
Document this caveat and turn asIterable() into drain(), which
removes all the entries in the queue, allowing new entries to be
enqueued. The late-comer entries are accounted for when we set the
forwarder.
Change-Id: Idf29c1e565e12aaed917ac94c21c552daf169d4d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
930747a6ba5d888d2fbe54473132680e4621d858)
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.Iterator;
+import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
public abstract Optional<T> getBackendInfo();
- final Iterable<ConnectionEntry> startReplay() {
+ final Collection<ConnectionEntry> startReplay() {
lock.lock();
- return queue.asIterable();
+ return queue.drain();
}
@GuardedBy("lock")
final void finishReplay(final ReconnectForwarder forwarder) {
- queue.setForwarder(forwarder);
+ setForwarder(forwarder);
lock.unlock();
}
@GuardedBy("lock")
final void setForwarder(final ReconnectForwarder forwarder) {
- final long now = currentTime();
- final Iterator<ConnectionEntry> it = queue.asIterable().iterator();
- while (it.hasNext()) {
- final ConnectionEntry e = it.next();
- forwarder.forwardEntry(e, now);
- it.remove();
- }
-
- queue.setForwarder(forwarder);
+ queue.setForwarder(forwarder, currentTime());
}
@GuardedBy("lock")
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
* @param enqueuedEntries Previously-enqueued entries
* @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns.
*/
- @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable<ConnectionEntry> enqueuedEntries);
+ @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection<ConnectionEntry> enqueuedEntries);
}
private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
// Lock the old connection and get a reference to its entries
- final Iterable<ConnectionEntry> replayIterable = conn.startReplay();
+ final Collection<ConnectionEntry> replayIterable = conn.startReplay();
// Finish the connection attempt
final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;
tracker = new AveragingProgressTracker(targetDepth);
}
- final Iterable<ConnectionEntry> asIterable() {
- return Iterables.concat(inflight, pending);
+ /**
+ * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+ * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+ * returns and the successor is set will be replayed to the successor.
+ *
+ * @return Collection of entries present in the queue.
+ */
+ final Collection<ConnectionEntry> drain() {
+ final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+ ret.addAll(inflight);
+ ret.addAll(pending);
+ inflight.clear();
+ pending.clear();
+ return ret;
}
final long ticksStalling(final long now) {
poisonQueue(pending, cause);
}
- final void setForwarder(final ReconnectForwarder forwarder) {
+ final void setForwarder(final ReconnectForwarder forwarder, final long now) {
Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
- Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
- Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
-
successor = Preconditions.checkNotNull(forwarder);
- LOG.debug("Connection {} superseded by {}", this, successor);
+ LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+
+ /*
+ * We need to account for entries which have been added between the time drain() was called and this method
+ * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+ * replay thread, there was an avenue for this to happen.
+ */
+ int count = 0;
+ ConnectionEntry entry = inflight.poll();
+ while (entry != null) {
+ successor.forwardEntry(entry, now);
+ entry = inflight.poll();
+ count++;
+ }
+
+ entry = pending.poll();
+ while (entry != null) {
+ successor.forwardEntry(entry, now);
+ entry = pending.poll();
+ count++;
+ }
+
+ LOG.debug("Connection {} queue spliced {} messages", this, count);
}
final void remove(final long now) {
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import com.google.common.base.Ticker;
-import com.google.common.collect.Iterables;
+import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
import org.junit.After;
@Test
public void testAsIterable() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
queue.enqueue(new ConnectionEntry(request, callback, now), now);
}
- final Iterable<ConnectionEntry> entries = queue.asIterable();
- Assert.assertEquals(sentMessages, Iterables.size(entries));
+ final Collection<ConnectionEntry> entries = queue.drain();
+ Assert.assertEquals(sentMessages, entries.size());
Assert.assertThat(entries, everyItem(entryWithRequest(request)));
}
final long requestSequence = 0L;
final long txSequence = 0L;
final long sessionId = 0L;
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
@Test
public void testIsEmpty() throws Exception {
Assert.assertTrue(queue.isEmpty());
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
@Test
public void testPeek() throws Exception {
- final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
- final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
@Test
public void testPoison() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
import com.google.common.base.Ticker;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.testing.FakeTicker;
import java.util.Arrays;
import java.util.Collection;
probe.expectMsgClass(RequestEnvelope.class);
}
probe.expectNoMsg();
- final Iterable<ConnectionEntry> entries = queue.asIterable();
- assertEquals(sentMessages, Iterables.size(entries));
+ final Collection<ConnectionEntry> entries = queue.drain();
+ assertEquals(sentMessages, entries.size());
assertThat(entries, everyItem(entryWithRequest(request)));
}
final Consumer<Response<?, ?>> callback = createConsumerMock();
final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
- queue.setForwarder(forwarder);
+ queue.setForwarder(forwarder, ticker.read());
final long secondEnqueueNow = ticker.read();
queue.enqueue(entry, secondEnqueueNow);
verify(forwarder).forwardEntry(entry, secondEnqueueNow);
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
}
@Override
- void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+ void replayRequests(final Collection<ConnectionEntry> previousEntries) {
proxy.replayRequests(previousEntries);
}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import java.util.Collection;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
/**
abstract class HistoryReconnectCohort implements AutoCloseable {
abstract ProxyReconnectCohort getProxy();
- abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
+ abstract void replayRequests(Collection<ConnectionEntry> previousEntries);
@Override
public abstract void close();
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@GuardedBy("lock")
@Override
- void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+ void replayRequests(final Collection<ConnectionEntry> previousEntries) {
// First look for our Create message
Iterator<ConnectionEntry> it = previousEntries.iterator();
while (it.hasNext()) {
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
- abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
+ abstract void replayRequests(Collection<ConnectionEntry> previousEntries);
abstract ProxyHistory finishReconnect();