Merge "BUG-1848 : OrderedNormalizedNodeWriter implementation"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index 0fbe68665e057689bf608c5502ee2ebe9c58c4fc..e04c1a5d185ffbb5bc81b4d035a07545c02ef3a7 100644 (file)
@@ -16,9 +16,7 @@ import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
-import akka.japi.Procedure;
 import akka.pattern.Patterns;
-import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -41,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -70,13 +69,13 @@ import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListene
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
+import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -100,6 +99,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -121,7 +121,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+                    dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
@@ -159,8 +159,12 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                    // Use a non persistent provider because this test actually invokes persist on the journal
+                    // this will cause all other messages to not be queued properly after that.
+                    // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+                    // it does do a persist)
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
+                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -209,7 +213,7 @@ public class ShardTest extends AbstractShardTest {
                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
             // Now send the RegisterChangeListener and wait for the reply.
-            shard.tell(new RegisterChangeListener(path, dclActor.path(),
+            shard.tell(new RegisterChangeListener(path, dclActor,
                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
@@ -287,7 +291,7 @@ public class ShardTest extends AbstractShardTest {
             final CountDownLatch recoveryComplete = new CountDownLatch(1);
             class TestShard extends Shard {
                 TestShard() {
-                    super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
+                    super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
                             newDatastoreContext(), SCHEMA_CONTEXT);
                 }
 
@@ -318,7 +322,7 @@ public class ShardTest extends AbstractShardTest {
                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
 
             String address = "akka://foobar";
-            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
+            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
 
             assertEquals("getPeerAddresses", address,
                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
@@ -763,7 +767,7 @@ public class ShardTest extends AbstractShardTest {
             Creator<Shard> creator = new Creator<Shard>() {
                 @Override
                 public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
+                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                             newDatastoreContext(), SCHEMA_CONTEXT) {
                         @Override
                         protected boolean isLeader() {
@@ -934,7 +938,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Use MBean for verification
                 // Committed transaction count should increase as usual
-                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+                assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 
                 // Commit index should advance as we do not have an empty modification
                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
@@ -1386,37 +1390,15 @@ public class ShardTest extends AbstractShardTest {
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
-        class DelegatingPersistentDataProvider implements DataPersistenceProvider {
-            DataPersistenceProvider delegate;
-
-            DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
-                this.delegate = delegate;
-            }
-
-            @Override
-            public boolean isRecoveryApplicable() {
-                return delegate.isRecoveryApplicable();
-            }
-
-            @Override
-            public <T> void persist(T o, Procedure<T> procedure) {
-                delegate.persist(o, procedure);
+        class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
+            TestPersistentDataProvider(DataPersistenceProvider delegate) {
+                super(delegate);
             }
 
             @Override
             public void saveSnapshot(Object o) {
                 savedSnapshot.set(o);
-                delegate.saveSnapshot(o);
-            }
-
-            @Override
-            public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-                delegate.deleteSnapshots(criteria);
-            }
-
-            @Override
-            public void deleteMessages(long sequenceNumber) {
-                delegate.deleteMessages(sequenceNumber);
+                super.saveSnapshot(o);
             }
         }
 
@@ -1424,29 +1406,32 @@ public class ShardTest extends AbstractShardTest {
 
         new ShardTestKit(getSystem()) {{
             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-            Creator<Shard> creator = new Creator<Shard>() {
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
 
-                        DelegatingPersistentDataProvider delegating;
+            class TestShard extends Shard {
 
-                        @Override
-                        protected DataPersistenceProvider persistence() {
-                            if(delegating == null) {
-                                delegating = new DelegatingPersistentDataProvider(super.persistence());
-                            }
+                protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
+                                    DatastoreContext datastoreContext, SchemaContext schemaContext) {
+                    super(name, peerAddresses, datastoreContext, schemaContext);
+                    setPersistence(new TestPersistentDataProvider(super.persistence()));
+                }
 
-                            return delegating;
-                        }
+                @Override
+                protected void commitSnapshot(final long sequenceNumber) {
+                    super.commitSnapshot(sequenceNumber);
+                    latch.get().countDown();
+                }
 
-                        @Override
-                        protected void commitSnapshot(final long sequenceNumber) {
-                            super.commitSnapshot(sequenceNumber);
-                            latch.get().countDown();
-                        }
-                    };
+                @Override
+                public RaftActorContext getRaftActorContext() {
+                    return super.getRaftActorContext();
+                }
+            }
+
+            Creator<Shard> creator = new Creator<Shard>() {
+                @Override
+                public Shard create() throws Exception {
+                    return new TestShard(shardID, Collections.<String,String>emptyMap(),
+                            newDatastoreContext(), SCHEMA_CONTEXT);
                 }
             };
 
@@ -1459,8 +1444,9 @@ public class ShardTest extends AbstractShardTest {
 
             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
 
-            CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
-            shard.tell(capture, getRef());
+            // Trigger creation of a snapshot by ensuring
+            RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
+            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
@@ -1472,7 +1458,7 @@ public class ShardTest extends AbstractShardTest {
             latch.set(new CountDownLatch(1));
             savedSnapshot.set(null);
 
-            shard.tell(capture, getRef());
+            raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
 
             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
 
@@ -1528,27 +1514,27 @@ public class ShardTest extends AbstractShardTest {
         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
 
-        final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+        final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
                 persistentContext, SCHEMA_CONTEXT);
 
         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
 
-        final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
+        final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
                 nonPersistentContext, SCHEMA_CONTEXT);
 
         new ShardTestKit(getSystem()) {{
             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
                     persistentProps, "testPersistence1");
 
-            assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
 
             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
                     nonPersistentProps, "testPersistence2");
 
-            assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
 
             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
@@ -1564,19 +1550,19 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             waitUntilLeader(shard);
 
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};