From: Robert Varga Date: Wed, 17 May 2017 21:39:55 +0000 (+0200) Subject: BUG-8491: Remove requests as they are replayed X-Git-Tag: release/nitrogen~194 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=28551609a31799a43d3017ba0681e198f5136d70 BUG-8491: Remove requests as they are replayed We should not be seeing any messages just after we have finished message replay, as the queue is still locked and we should have accounted for all messages by removing them from the queue. Change-Id: I47396b4705e048460934538acc470468a0a6285d Signed-off-by: Robert Varga Signed-off-by: Tomas Cere (cherry picked from commit 585e116247f9b616579ffad1785a972621d928e7) --- diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 4dfe43b089..47c0676979 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -13,6 +13,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -162,13 +163,21 @@ public abstract class AbstractClientConnection { @GuardedBy("lock") final void finishReplay(final ReconnectForwarder forwarder) { - setForwarder(forwarder); + queue.setForwarder(forwarder); lock.unlock(); } @GuardedBy("lock") final void setForwarder(final ReconnectForwarder forwarder) { - queue.setForwarder(forwarder, currentTime()); + final long now = currentTime(); + final Iterator it = queue.asIterable().iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); + forwarder.forwardEntry(e, now); + it.remove(); + } + + queue.setForwarder(forwarder); } @GuardedBy("lock") diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index b2497fc7e7..28902d28ba 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -232,22 +232,13 @@ abstract class TransmitQueue { poisonQueue(pending, cause); } - final void setForwarder(final ReconnectForwarder forwarder, final long now) { + final void setForwarder(final ReconnectForwarder forwarder) { Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this); - successor = Preconditions.checkNotNull(forwarder); - LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); - - ConnectionEntry entry = inflight.poll(); - while (entry != null) { - successor.forwardEntry(entry, now); - entry = inflight.poll(); - } + Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight); + Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending); - entry = pending.poll(); - while (entry != null) { - successor.forwardEntry(entry, now); - entry = pending.poll(); - } + successor = Preconditions.checkNotNull(forwarder); + LOG.debug("Connection {} superseded by {}", this, successor); } final void remove(final long now) { diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java index 6a833a8dd0..9fd3a21d8f 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.access.client; import static org.hamcrest.CoreMatchers.hasItems; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -20,6 +18,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; +import com.google.common.collect.Iterables; import java.util.Optional; import java.util.function.Consumer; import org.junit.After; @@ -145,10 +144,10 @@ public abstract class AbstractClientConnectionTest entries = connection.startReplay(); Assert.assertThat(entries, hasItems(entryWithRequest(request1), entryWithRequest(request2))); + Assert.assertEquals(2, Iterables.size(entries)); + Iterables.removeIf(entries, e -> true); final ReconnectForwarder forwarder = mock(ReconnectForwarder.class); connection.finishReplay(forwarder); - verify(forwarder).forwardEntry(argThat(entryWithRequest(request1)), anyLong()); - verify(forwarder).forwardEntry(argThat(entryWithRequest(request2)), anyLong()); } @After diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java index 2c1159ef93..eb101520e3 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java @@ -19,7 +19,7 @@ public class ConnectedClientConnectionTest extends AbstractClientConnectionTest, BackendInfo> { @Override - protected ConnectedClientConnection createConnection() { + protected ConnectedClientConnection createConnection() { final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10); return new ConnectedClientConnection<>(context, 0L, backend); } diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index 358cc9d408..752c12771a 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -12,8 +12,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; @@ -144,11 +142,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read()); - queue.enqueue(entry, ticker.read()); final ReconnectForwarder forwarder = mock(ReconnectForwarder.class); - final long setForwarderNow = ticker.read(); - queue.setForwarder(forwarder, setForwarderNow); - verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow)); + queue.setForwarder(forwarder); final long secondEnqueueNow = ticker.read(); queue.enqueue(entry, secondEnqueueNow); verify(forwarder).forwardEntry(entry, secondEnqueueNow); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 8c3b485134..802d9ed0b3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -11,6 +11,7 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -213,12 +214,15 @@ abstract class ProxyHistory implements Identifiable { @Override void replayRequests(final Iterable previousEntries) { // First look for our Create message - for (ConnectionEntry e : previousEntries) { + Iterator it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); if (req instanceof CreateLocalHistoryRequest) { successor.connection.sendRequest(req, e.getCallback()); + it.remove(); break; } } @@ -233,11 +237,17 @@ abstract class ProxyHistory implements Identifiable { } // Now look for any finalizing messages - for (ConnectionEntry e : previousEntries) { + it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); - successor.connection.sendRequest(req, e.getCallback()); + if (req instanceof DestroyLocalHistoryRequest) { + successor.connection.sendRequest(req, e.getCallback()); + it.remove(); + break; + } } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java index 8faaf6ac0c..b7b736e853 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java @@ -26,6 +26,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; +import akka.actor.Props; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; @@ -39,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,11 +52,16 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; +import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; +import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor; import org.opendaylight.controller.cluster.datastore.AbstractTest; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer; import org.opendaylight.controller.cluster.dom.api.CDSShardAccess; @@ -202,7 +209,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { } @Test - public void testSingleNodeWrites() throws Exception { + public void testSingleNodeWritesAndRead() throws Exception { initEmptyDatastores(); final DistributedShardRegistration shardRegistration = waitOnAsyncTask( @@ -249,6 +256,28 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { verifyNoMoreInteractions(mockedDataTreeListener); + final String shardName = ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()); + LOG.debug("Creating distributed datastore client for shard {}", shardName); + + final ActorContext actorContext = leaderDistributedDataStore.getActorContext(); + final Props distributedDataStoreClientProps = + SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(), + "Shard-" + shardName, actorContext, shardName); + + final ActorRef clientActor = leaderSystem.actorOf(distributedDataStoreClientProps); + final DataStoreClient distributedDataStoreClient = SimpleDataStoreClientActor + .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS); + + final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory(); + final ClientTransaction tx2 = localHistory.createTransaction(); + final CheckedFuture>, + org.opendaylight.mdsal.common.api.ReadFailedException> read = + tx2.read(YangInstanceIdentifier.EMPTY); + + final Optional> optional = read.checkedGet(); + tx2.abort(); + localHistory.close(); + shardRegistration.close().toCompletableFuture().get(); }