X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=cc96d0d3b0d070623c737dc8f78340c45a20539f;hb=994dbb10de67c0d9fd5f78a216ea372326609a49;hp=0fbe68665e057689bf608c5502ee2ebe9c58c4fc;hpb=00cc355c0c58e999ffebd531bca3a507e150e441;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 0fbe68665e..cc96d0d3b0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -70,13 +70,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; +import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -100,6 +100,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -121,7 +122,7 @@ public class ShardTest extends AbstractShardTest { "testRegisterChangeListener-DataChangeListener"); shard.tell(new RegisterChangeListener(TestModel.TEST_PATH, - dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef()); + dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef()); RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"), RegisterChangeListenerReply.class); @@ -159,8 +160,12 @@ public class ShardTest extends AbstractShardTest { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { + // Use a non persistent provider because this test actually invokes persist on the journal + // this will cause all other messages to not be queued properly after that. + // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when + // it does do a persist) + return new Shard(shardID, Collections.emptyMap(), + dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { @Override public void onReceiveCommand(final Object message) throws Exception { if(message instanceof ElectionTimeout && firstElectionTimeout) { @@ -209,7 +214,7 @@ public class ShardTest extends AbstractShardTest { onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); // Now send the RegisterChangeListener and wait for the reply. - shard.tell(new RegisterChangeListener(path, dclActor.path(), + shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.SUBTREE), getRef()); RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), @@ -287,7 +292,7 @@ public class ShardTest extends AbstractShardTest { final CountDownLatch recoveryComplete = new CountDownLatch(1); class TestShard extends Shard { TestShard() { - super(shardID, Collections.singletonMap(shardID, null), + super(shardID, Collections.singletonMap(shardID.toString(), null), newDatastoreContext(), SCHEMA_CONTEXT); } @@ -318,7 +323,7 @@ public class ShardTest extends AbstractShardTest { Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS)); String address = "akka://foobar"; - shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address)); + shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address)); assertEquals("getPeerAddresses", address, ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString())); @@ -763,7 +768,7 @@ public class ShardTest extends AbstractShardTest { Creator creator = new Creator() { @Override public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), + return new Shard(shardID, Collections.emptyMap(), newDatastoreContext(), SCHEMA_CONTEXT) { @Override protected boolean isLeader() { @@ -934,7 +939,7 @@ public class ShardTest extends AbstractShardTest { // Use MBean for verification // Committed transaction count should increase as usual - assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); + assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount()); // Commit index should advance as we do not have an empty modification assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex()); @@ -1422,31 +1427,44 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.persistent(persistent); + + new ShardTestKit(getSystem()) {{ final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); - Creator creator = new Creator() { - @Override - public Shard create() throws Exception { - return new Shard(shardID, Collections.emptyMap(), - newDatastoreContext(), SCHEMA_CONTEXT) { - DelegatingPersistentDataProvider delegating; + class TestShard extends Shard { - @Override - protected DataPersistenceProvider persistence() { - if(delegating == null) { - delegating = new DelegatingPersistentDataProvider(super.persistence()); - } + protected TestShard(ShardIdentifier name, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + super(name, peerAddresses, datastoreContext, schemaContext); + } - return delegating; - } + DelegatingPersistentDataProvider delegating; - @Override - protected void commitSnapshot(final long sequenceNumber) { - super.commitSnapshot(sequenceNumber); - latch.get().countDown(); - } - }; + protected DataPersistenceProvider persistence() { + if(delegating == null) { + delegating = new DelegatingPersistentDataProvider(super.persistence()); + } + return delegating; + } + + @Override + protected void commitSnapshot(final long sequenceNumber) { + super.commitSnapshot(sequenceNumber); + latch.get().countDown(); + } + + @Override + public RaftActorContext getRaftActorContext() { + return super.getRaftActorContext(); + } + } + + Creator creator = new Creator() { + @Override + public Shard create() throws Exception { + return new TestShard(shardID, Collections.emptyMap(), + newDatastoreContext(), SCHEMA_CONTEXT); } }; @@ -1459,8 +1477,9 @@ public class ShardTest extends AbstractShardTest { NormalizedNode expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build()); - CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1); - shard.tell(capture, getRef()); + // Trigger creation of a snapshot by ensuring + RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext(); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1472,7 +1491,7 @@ public class ShardTest extends AbstractShardTest { latch.set(new CountDownLatch(1)); savedSnapshot.set(null); - shard.tell(capture, getRef()); + raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1); assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS)); @@ -1528,13 +1547,13 @@ public class ShardTest extends AbstractShardTest { final DatastoreContext persistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build(); - final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), + final Props persistentProps = Shard.props(shardID, Collections.emptyMap(), persistentContext, SCHEMA_CONTEXT); final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder(). shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build(); - final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), + final Props nonPersistentProps = Shard.props(shardID, Collections.emptyMap(), nonPersistentContext, SCHEMA_CONTEXT); new ShardTestKit(getSystem()) {{