import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.Iterator;
+import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public abstract Optional<T> getBackendInfo();
- final Iterable<ConnectionEntry> startReplay() {
+ final Collection<ConnectionEntry> startReplay() {
lock.lock();
- return queue.asIterable();
+ return queue.drain();
}
@GuardedBy("lock")
final void finishReplay(final ReconnectForwarder forwarder) {
- queue.setForwarder(forwarder);
+ setForwarder(forwarder);
lock.unlock();
}
@GuardedBy("lock")
final void setForwarder(final ReconnectForwarder forwarder) {
- final long now = currentTime();
- final Iterator<ConnectionEntry> it = queue.asIterable().iterator();
- while (it.hasNext()) {
- final ConnectionEntry e = it.next();
- forwarder.forwardEntry(e, now);
- it.remove();
- }
-
- queue.setForwarder(forwarder);
+ queue.setForwarder(forwarder, currentTime());
}
@GuardedBy("lock")
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
* @param enqueuedEntries Previously-enqueued entries
* @return A {@link ReconnectForwarder} to handle any straggler messages which arrive after this method returns.
*/
- @Nonnull ReconnectForwarder finishReconnect(@Nonnull Iterable<ConnectionEntry> enqueuedEntries);
+ @Nonnull ReconnectForwarder finishReconnect(@Nonnull Collection<ConnectionEntry> enqueuedEntries);
}
private static final Logger LOG = LoggerFactory.getLogger(ClientActorBehavior.class);
final ConnectionConnectCohort cohort = Verify.verifyNotNull(connectionUp(newConn));
// Lock the old connection and get a reference to its entries
- final Iterable<ConnectionEntry> replayIterable = conn.startReplay();
+ final Collection<ConnectionEntry> replayIterable = conn.startReplay();
// Finish the connection attempt
final ReconnectForwarder forwarder = Verify.verifyNotNull(cohort.finishReconnect(replayIterable));
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;
tracker = new AveragingProgressTracker(targetDepth);
}
- final Iterable<ConnectionEntry> asIterable() {
- return Iterables.concat(inflight, pending);
+ /**
+ * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+ * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+ * returns and the successor is set will be replayed to the successor.
+ *
+ * @return Collection of entries present in the queue.
+ */
+ final Collection<ConnectionEntry> drain() {
+ final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+ ret.addAll(inflight);
+ ret.addAll(pending);
+ inflight.clear();
+ pending.clear();
+ return ret;
}
final long ticksStalling(final long now) {
poisonQueue(pending, cause);
}
- final void setForwarder(final ReconnectForwarder forwarder) {
+ final void setForwarder(final ReconnectForwarder forwarder, final long now) {
Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
- Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
- Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
-
successor = Preconditions.checkNotNull(forwarder);
- LOG.debug("Connection {} superseded by {}", this, successor);
+ LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+
+ /*
+ * We need to account for entries which have been added between the time drain() was called and this method
+ * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+ * replay thread, there was an avenue for this to happen.
+ */
+ int count = 0;
+ ConnectionEntry entry = inflight.poll();
+ while (entry != null) {
+ successor.forwardEntry(entry, now);
+ entry = inflight.poll();
+ count++;
+ }
+
+ entry = pending.poll();
+ while (entry != null) {
+ successor.forwardEntry(entry, now);
+ entry = pending.poll();
+ count++;
+ }
+
+ LOG.debug("Connection {} queue spliced {} messages", this, count);
}
final void remove(final long now) {
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import com.google.common.base.Ticker;
-import com.google.common.collect.Iterables;
+import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
import org.junit.After;
@Test
public void testAsIterable() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
queue.enqueue(new ConnectionEntry(request, callback, now), now);
}
- final Iterable<ConnectionEntry> entries = queue.asIterable();
- Assert.assertEquals(sentMessages, Iterables.size(entries));
+ final Collection<ConnectionEntry> entries = queue.drain();
+ Assert.assertEquals(sentMessages, entries.size());
Assert.assertThat(entries, everyItem(entryWithRequest(request)));
}
final long requestSequence = 0L;
final long txSequence = 0L;
final long sessionId = 0L;
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
@Test
public void testIsEmpty() throws Exception {
Assert.assertTrue(queue.isEmpty());
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
@Test
public void testPeek() throws Exception {
- final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
- final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
@Test
public void testPoison() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
import com.google.common.base.Ticker;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.testing.FakeTicker;
import java.util.Arrays;
import java.util.Collection;
probe.expectMsgClass(RequestEnvelope.class);
}
probe.expectNoMsg();
- final Iterable<ConnectionEntry> entries = queue.asIterable();
- assertEquals(sentMessages, Iterables.size(entries));
+ final Collection<ConnectionEntry> entries = queue.drain();
+ assertEquals(sentMessages, entries.size());
assertThat(entries, everyItem(entryWithRequest(request)));
}
final Consumer<Response<?, ?>> callback = createConsumerMock();
final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
- queue.setForwarder(forwarder);
+ queue.setForwarder(forwarder, ticker.read());
final long secondEnqueueNow = ticker.read();
queue.enqueue(entry, secondEnqueueNow);
verify(forwarder).forwardEntry(entry, secondEnqueueNow);
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
}
@Override
- void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+ void replayRequests(final Collection<ConnectionEntry> previousEntries) {
proxy.replayRequests(previousEntries);
}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import java.util.Collection;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
/**
abstract class HistoryReconnectCohort implements AutoCloseable {
abstract ProxyReconnectCohort getProxy();
- abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
+ abstract void replayRequests(Collection<ConnectionEntry> previousEntries);
@Override
public abstract void close();
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@GuardedBy("lock")
@Override
- void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+ void replayRequests(final Collection<ConnectionEntry> previousEntries) {
// First look for our Create message
Iterator<ConnectionEntry> it = previousEntries.iterator();
while (it.hasNext()) {
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import java.util.Collection;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
- abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
+ abstract void replayRequests(Collection<ConnectionEntry> previousEntries);
abstract ProxyHistory finishReconnect();