BUG-8491: Remove requests as they are replayed 68/57568/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 17 May 2017 21:39:55 +0000 (23:39 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 19 May 2017 18:17:09 +0000 (20:17 +0200)
We should not be seeing any messages just after we have finished
message replay, as the queue is still locked and we should have
accounted for all messages by removing them from the queue.

Change-Id: I47396b4705e048460934538acc470468a0a6285d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
Signed-off-by: Tomas Cere <tcere@cisco.com>
(cherry picked from commit 585e116247f9b616579ffad1785a972621d928e7)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java

index 4dfe43b..47c0676 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -162,13 +163,21 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     @GuardedBy("lock")
     final void finishReplay(final ReconnectForwarder forwarder) {
-        setForwarder(forwarder);
+        queue.setForwarder(forwarder);
         lock.unlock();
     }
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder, currentTime());
+        final long now = currentTime();
+        final Iterator<ConnectionEntry> it = queue.asIterable().iterator();
+        while (it.hasNext()) {
+            final ConnectionEntry e = it.next();
+            forwarder.forwardEntry(e, now);
+            it.remove();
+        }
+
+        queue.setForwarder(forwarder);
     }
 
     @GuardedBy("lock")
index b2497fc..28902d2 100644 (file)
@@ -232,22 +232,13 @@ abstract class TransmitQueue {
         poisonQueue(pending, cause);
     }
 
-    final void setForwarder(final ReconnectForwarder forwarder, final long now) {
+    final void setForwarder(final ReconnectForwarder forwarder) {
         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
-        successor = Preconditions.checkNotNull(forwarder);
-        LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
-
-        ConnectionEntry entry = inflight.poll();
-        while (entry != null) {
-            successor.forwardEntry(entry, now);
-            entry = inflight.poll();
-        }
+        Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
+        Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
 
-        entry = pending.poll();
-        while (entry != null) {
-            successor.forwardEntry(entry, now);
-            entry = pending.poll();
-        }
+        successor = Preconditions.checkNotNull(forwarder);
+        LOG.debug("Connection {} superseded by {}", this, successor);
     }
 
     final void remove(final long now) {
index 6a833a8..9fd3a21 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.hamcrest.CoreMatchers.hasItems;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -20,6 +18,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
+import com.google.common.collect.Iterables;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.junit.After;
@@ -145,10 +144,10 @@ public abstract class AbstractClientConnectionTest<T extends AbstractClientConne
         connection.sendRequest(request2, callback);
         final Iterable<ConnectionEntry> entries = connection.startReplay();
         Assert.assertThat(entries, hasItems(entryWithRequest(request1), entryWithRequest(request2)));
+        Assert.assertEquals(2, Iterables.size(entries));
+        Iterables.removeIf(entries, e -> true);
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
         connection.finishReplay(forwarder);
-        verify(forwarder).forwardEntry(argThat(entryWithRequest(request1)), anyLong());
-        verify(forwarder).forwardEntry(argThat(entryWithRequest(request2)), anyLong());
     }
 
     @After
index 2c1159e..eb10152 100644 (file)
@@ -19,7 +19,7 @@ public class ConnectedClientConnectionTest
         extends AbstractClientConnectionTest<ConnectedClientConnection<BackendInfo>, BackendInfo> {
 
     @Override
-    protected ConnectedClientConnection createConnection() {
+    protected ConnectedClientConnection<BackendInfo> createConnection() {
         final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10);
         return new ConnectedClientConnection<>(context, 0L, backend);
     }
index 358cc9d..752c127 100644 (file)
@@ -12,8 +12,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
@@ -144,11 +142,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
-        queue.enqueue(entry, ticker.read());
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
-        final long setForwarderNow = ticker.read();
-        queue.setForwarder(forwarder, setForwarderNow);
-        verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
+        queue.setForwarder(forwarder);
         final long secondEnqueueNow = ticker.read();
         queue.enqueue(entry, secondEnqueueNow);
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
index 8c3b485..802d9ed 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -213,12 +214,15 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
             // First look for our Create message
-            for (ConnectionEntry e : previousEntries) {
+            Iterator<ConnectionEntry> it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
                         successor.connection.sendRequest(req, e.getCallback());
+                        it.remove();
                         break;
                     }
                 }
@@ -233,11 +237,17 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             }
 
             // Now look for any finalizing messages
-            for (ConnectionEntry e : previousEntries) {
+            it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e  = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
-                    successor.connection.sendRequest(req, e.getCallback());
+                    if (req instanceof DestroyLocalHistoryRequest) {
+                        successor.connection.sendRequest(req, e.getCallback());
+                        it.remove();
+                        break;
+                    }
                 }
             }
         }
index 8faaf6a..b7b736e 100644 (file)
@@ -26,6 +26,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
+import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,11 +52,16 @@ import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.AbstractTest;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
 import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
@@ -202,7 +209,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     }
 
     @Test
-    public void testSingleNodeWrites() throws Exception {
+    public void testSingleNodeWritesAndRead() throws Exception {
         initEmptyDatastores();
 
         final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
@@ -249,6 +256,28 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
         verifyNoMoreInteractions(mockedDataTreeListener);
 
+        final String shardName = ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier());
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+        final ActorContext actorContext = leaderDistributedDataStore.getActorContext();
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+                        "Shard-" + shardName, actorContext, shardName);
+
+        final ActorRef clientActor = leaderSystem.actorOf(distributedDataStoreClientProps);
+        final DataStoreClient distributedDataStoreClient = SimpleDataStoreClientActor
+                    .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+
+        final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+        final ClientTransaction tx2 = localHistory.createTransaction();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>,
+                org.opendaylight.mdsal.common.api.ReadFailedException> read =
+                tx2.read(YangInstanceIdentifier.EMPTY);
+
+        final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+        tx2.abort();
+        localHistory.close();
+
         shardRegistration.close().toCompletableFuture().get();
 
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.