BUG-8491: Remove requests as they are replayed 68/57568/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 17 May 2017 21:39:55 +0000 (23:39 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 19 May 2017 18:17:09 +0000 (20:17 +0200)
We should not be seeing any messages just after we have finished
message replay, as the queue is still locked and we should have
accounted for all messages by removing them from the queue.

Change-Id: I47396b4705e048460934538acc470468a0a6285d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
Signed-off-by: Tomas Cere <tcere@cisco.com>
(cherry picked from commit 585e116247f9b616579ffad1785a972621d928e7)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java

index 4dfe43b0892ef2507ff658c17771797722fe6106..47c0676979b94ea291532e5b713aee0bbcbece56 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Preconditions;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -162,13 +163,21 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     @GuardedBy("lock")
     final void finishReplay(final ReconnectForwarder forwarder) {
-        setForwarder(forwarder);
+        queue.setForwarder(forwarder);
         lock.unlock();
     }
 
     @GuardedBy("lock")
     final void setForwarder(final ReconnectForwarder forwarder) {
-        queue.setForwarder(forwarder, currentTime());
+        final long now = currentTime();
+        final Iterator<ConnectionEntry> it = queue.asIterable().iterator();
+        while (it.hasNext()) {
+            final ConnectionEntry e = it.next();
+            forwarder.forwardEntry(e, now);
+            it.remove();
+        }
+
+        queue.setForwarder(forwarder);
     }
 
     @GuardedBy("lock")
index b2497fc7e71798afcde0b7bec2ca624e7c99df49..28902d28ba2142f59af6dbc055a4933c2eb76b3a 100644 (file)
@@ -232,22 +232,13 @@ abstract class TransmitQueue {
         poisonQueue(pending, cause);
     }
 
-    final void setForwarder(final ReconnectForwarder forwarder, final long now) {
+    final void setForwarder(final ReconnectForwarder forwarder) {
         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
-        successor = Preconditions.checkNotNull(forwarder);
-        LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
-
-        ConnectionEntry entry = inflight.poll();
-        while (entry != null) {
-            successor.forwardEntry(entry, now);
-            entry = inflight.poll();
-        }
+        Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
+        Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
 
-        entry = pending.poll();
-        while (entry != null) {
-            successor.forwardEntry(entry, now);
-            entry = pending.poll();
-        }
+        successor = Preconditions.checkNotNull(forwarder);
+        LOG.debug("Connection {} superseded by {}", this, successor);
     }
 
     final void remove(final long now) {
index 6a833a8dd0b181f98d1d98eab9f5643a6976a099..9fd3a21d8f7afde2def32f9b5376cbc38fcb710e 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.hamcrest.CoreMatchers.hasItems;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
@@ -20,6 +18,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestProbe;
+import com.google.common.collect.Iterables;
 import java.util.Optional;
 import java.util.function.Consumer;
 import org.junit.After;
@@ -145,10 +144,10 @@ public abstract class AbstractClientConnectionTest<T extends AbstractClientConne
         connection.sendRequest(request2, callback);
         final Iterable<ConnectionEntry> entries = connection.startReplay();
         Assert.assertThat(entries, hasItems(entryWithRequest(request1), entryWithRequest(request2)));
+        Assert.assertEquals(2, Iterables.size(entries));
+        Iterables.removeIf(entries, e -> true);
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
         connection.finishReplay(forwarder);
-        verify(forwarder).forwardEntry(argThat(entryWithRequest(request1)), anyLong());
-        verify(forwarder).forwardEntry(argThat(entryWithRequest(request2)), anyLong());
     }
 
     @After
index 2c1159ef93977dc2efd223da6ac006c9b261885d..eb101520e316099e9a041c07d48cc7f68187e147 100644 (file)
@@ -19,7 +19,7 @@ public class ConnectedClientConnectionTest
         extends AbstractClientConnectionTest<ConnectedClientConnection<BackendInfo>, BackendInfo> {
 
     @Override
-    protected ConnectedClientConnection createConnection() {
+    protected ConnectedClientConnection<BackendInfo> createConnection() {
         final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10);
         return new ConnectedClientConnection<>(context, 0L, backend);
     }
index 358cc9d408e9cbe7dc47d37e0c06244342f5f8d8..752c12771a67bb755c2063b2a16a6b10c2f5a48f 100644 (file)
@@ -12,8 +12,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
@@ -144,11 +142,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
-        queue.enqueue(entry, ticker.read());
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
-        final long setForwarderNow = ticker.read();
-        queue.setForwarder(forwarder, setForwarderNow);
-        verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
+        queue.setForwarder(forwarder);
         final long secondEnqueueNow = ticker.read();
         queue.enqueue(entry, secondEnqueueNow);
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
index 8c3b485134c6f801f35f475c48cd33b1f5da3a6d..802d9ed0b33bc3a1fb5716a62d84bf03ffd266d3 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -213,12 +214,15 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
             // First look for our Create message
-            for (ConnectionEntry e : previousEntries) {
+            Iterator<ConnectionEntry> it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
                     if (req instanceof CreateLocalHistoryRequest) {
                         successor.connection.sendRequest(req, e.getCallback());
+                        it.remove();
                         break;
                     }
                 }
@@ -233,11 +237,17 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             }
 
             // Now look for any finalizing messages
-            for (ConnectionEntry e : previousEntries) {
+            it = previousEntries.iterator();
+            while (it.hasNext()) {
+                final ConnectionEntry e  = it.next();
                 final Request<?, ?> req = e.getRequest();
                 if (identifier.equals(req.getTarget())) {
                     Verify.verify(req instanceof LocalHistoryRequest);
-                    successor.connection.sendRequest(req, e.getCallback());
+                    if (req instanceof DestroyLocalHistoryRequest) {
+                        successor.connection.sendRequest(req, e.getCallback());
+                        it.remove();
+                        break;
+                    }
                 }
             }
         }
index 8faaf6ac0cca1a02567b6a44eac036907ec1fd8d..b7b736e853cfee3eb6391646d7ff869375b4b302 100644 (file)
@@ -26,6 +26,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
+import akka.actor.Props;
 import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
@@ -39,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,11 +52,16 @@ import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.AbstractTest;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
 import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
@@ -202,7 +209,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
     }
 
     @Test
-    public void testSingleNodeWrites() throws Exception {
+    public void testSingleNodeWritesAndRead() throws Exception {
         initEmptyDatastores();
 
         final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
@@ -249,6 +256,28 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
         verifyNoMoreInteractions(mockedDataTreeListener);
 
+        final String shardName = ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier());
+        LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+        final ActorContext actorContext = leaderDistributedDataStore.getActorContext();
+        final Props distributedDataStoreClientProps =
+                SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+                        "Shard-" + shardName, actorContext, shardName);
+
+        final ActorRef clientActor = leaderSystem.actorOf(distributedDataStoreClientProps);
+        final DataStoreClient distributedDataStoreClient = SimpleDataStoreClientActor
+                    .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+
+        final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+        final ClientTransaction tx2 = localHistory.createTransaction();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>,
+                org.opendaylight.mdsal.common.api.ReadFailedException> read =
+                tx2.read(YangInstanceIdentifier.EMPTY);
+
+        final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+        tx2.abort();
+        localHistory.close();
+
         shardRegistration.close().toCompletableFuture().get();
 
     }