Refactor snapshot message processing to RaftActorSnapshotMessageSupport
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
index aef25827e70659ba2971eb30b913af54bf2dbbb9..103d18bd1cc81defaa8d0a3c5ac406dc11a56ce8 100644 (file)
@@ -16,9 +16,8 @@ import akka.actor.Props;
 import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
-import akka.japi.Procedure;
 import akka.pattern.Patterns;
-import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -41,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -122,7 +122,7 @@ public class ShardTest extends AbstractShardTest {
                     "testRegisterChangeListener-DataChangeListener");
 
             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                    dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+                    dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterChangeListenerReply.class);
@@ -160,8 +160,12 @@ public class ShardTest extends AbstractShardTest {
 
                 @Override
                 public Shard create() throws Exception {
+                    // Use a non persistent provider because this test actually invokes persist on the journal
+                    // this will cause all other messages to not be queued properly after that.
+                    // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
+                    // it does do a persist)
                     return new Shard(shardID, Collections.<String,String>emptyMap(),
-                            newDatastoreContext(), SCHEMA_CONTEXT) {
+                            dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
                         @Override
                         public void onReceiveCommand(final Object message) throws Exception {
                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
@@ -210,7 +214,7 @@ public class ShardTest extends AbstractShardTest {
                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
             // Now send the RegisterChangeListener and wait for the reply.
-            shard.tell(new RegisterChangeListener(path, dclActor.path(),
+            shard.tell(new RegisterChangeListener(path, dclActor,
                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
 
             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
@@ -935,7 +939,7 @@ public class ShardTest extends AbstractShardTest {
 
                 // Use MBean for verification
                 // Committed transaction count should increase as usual
-                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
+                assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 
                 // Commit index should advance as we do not have an empty modification
                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
@@ -1386,68 +1390,39 @@ public class ShardTest extends AbstractShardTest {
     @SuppressWarnings("serial")
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
-        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
-        class DelegatingPersistentDataProvider implements DataPersistenceProvider {
-            DataPersistenceProvider delegate;
-
-            DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
-                this.delegate = delegate;
-            }
-
-            @Override
-            public boolean isRecoveryApplicable() {
-                return delegate.isRecoveryApplicable();
-            }
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
 
-            @Override
-            public <T> void persist(T o, Procedure<T> procedure) {
-                delegate.persist(o, procedure);
+        final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
+        class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
+            TestPersistentDataProvider(DataPersistenceProvider delegate) {
+                super(delegate);
             }
 
             @Override
             public void saveSnapshot(Object o) {
                 savedSnapshot.set(o);
-                delegate.saveSnapshot(o);
-            }
-
-            @Override
-            public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
-                delegate.deleteSnapshots(criteria);
-            }
-
-            @Override
-            public void deleteMessages(long sequenceNumber) {
-                delegate.deleteMessages(sequenceNumber);
+                super.saveSnapshot(o);
             }
         }
 
         dataStoreContextBuilder.persistent(persistent);
 
-
-
         new ShardTestKit(getSystem()) {{
-            final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
             class TestShard extends Shard {
 
                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
                     super(name, peerAddresses, datastoreContext, schemaContext);
+                    setPersistence(new TestPersistentDataProvider(super.persistence()));
                 }
 
-                DelegatingPersistentDataProvider delegating;
+                @Override
+                public void handleCommand(Object message) {
+                    super.handleCommand(message);
 
-                protected DataPersistenceProvider persistence() {
-                    if(delegating == null) {
-                        delegating = new DelegatingPersistentDataProvider(super.persistence());
+                    if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                        latch.get().countDown();
                     }
-                    return delegating;
-                }
-
-                @Override
-                protected void commitSnapshot(final long sequenceNumber) {
-                    super.commitSnapshot(sequenceNumber);
-                    latch.get().countDown();
                 }
 
                 @Override
@@ -1556,14 +1531,14 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
                     persistentProps, "testPersistence1");
 
-            assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
 
             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
                     nonPersistentProps, "testPersistence2");
 
-            assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+            assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
 
             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
 
@@ -1579,19 +1554,19 @@ public class ShardTest extends AbstractShardTest {
             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             waitUntilLeader(shard);
 
             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", false,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
 
             assertEquals("isRecoveryApplicable", true,
-                    shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
+                    shard.underlyingActor().persistence().isRecoveryApplicable());
 
             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};