Calculate replicated log data size on recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index 17a81ac3c39aa9c0a27743e1739892629a157ba8..14bfd1d348b69dc76332fc35ec8f8f94dd80e8db 100644 (file)
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -97,9 +98,11 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
     }
 
-    public static class MockRaftActor extends RaftActor {
+    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 
-        private final RaftActor delegate;
+        private final RaftActor actorDelegate;
+        private final RaftActorRecoveryCohort recoveryCohortDelegate;
+        private final RaftActorSnapshotCohort snapshotCohortDelegate;
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
         private ActorRef roleChangeNotifier;
@@ -136,7 +139,9 @@ public class RaftActorTest extends AbstractActorTest {
                              DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
-            this.delegate = mock(RaftActor.class);
+            this.actorDelegate = mock(RaftActor.class);
+            this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+            this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
             if(dataPersistenceProvider == null){
                 setPersistence(true);
             } else {
@@ -197,26 +202,37 @@ public class RaftActorTest extends AbstractActorTest {
 
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
-            delegate.applyState(clientActor, identifier, data);
+            actorDelegate.applyState(clientActor, identifier, data);
             LOG.info("{}: applyState called", persistenceId());
         }
 
         @Override
-        protected void startLogRecoveryBatch(int maxBatchSize) {
+        @Nonnull
+        protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+            return this;
         }
 
         @Override
-        protected void appendRecoveredLogEntry(Payload data) {
+        protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+            return this;
+        }
+
+        @Override
+        public void startLogRecoveryBatch(int maxBatchSize) {
+        }
+
+        @Override
+        public void appendRecoveredLogEntry(Payload data) {
             state.add(data);
         }
 
         @Override
-        protected void applyCurrentLogRecoveryBatch() {
+        public void applyCurrentLogRecoveryBatch() {
         }
 
         @Override
         protected void onRecoveryComplete() {
-            delegate.onRecoveryComplete();
+            actorDelegate.onRecoveryComplete();
             recoveryComplete.countDown();
         }
 
@@ -227,8 +243,8 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override
-        protected void applyRecoverySnapshot(byte[] bytes) {
-            delegate.applyRecoverySnapshot(bytes);
+        public void applyRecoverySnapshot(byte[] bytes) {
+            recoveryCohortDelegate.applyRecoverySnapshot(bytes);
             try {
                 Object data = toObject(bytes);
                 if (data instanceof List) {
@@ -239,18 +255,21 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
-        @Override protected void createSnapshot() {
+        @Override
+        public void createSnapshot(ActorRef actorRef) {
             LOG.info("{}: createSnapshot called", persistenceId());
-            delegate.createSnapshot();
+            snapshotCohortDelegate.createSnapshot(actorRef);
         }
 
-        @Override protected void applySnapshot(byte [] snapshot) {
+        @Override
+        public void applySnapshot(byte [] snapshot) {
             LOG.info("{}: applySnapshot called", persistenceId());
-            delegate.applySnapshot(snapshot);
+            snapshotCohortDelegate.applySnapshot(snapshot);
         }
 
-        @Override protected void onStateChanged() {
-            delegate.onStateChanged();
+        @Override
+        protected void onStateChanged() {
+            actorDelegate.onStateChanged();
         }
 
         @Override
@@ -284,7 +303,6 @@ public class RaftActorTest extends AbstractActorTest {
         public ReplicatedLog getReplicatedLog(){
             return this.getRaftActorContext().getReplicatedLog();
         }
-
     }
 
 
@@ -399,11 +417,11 @@ public class RaftActorTest extends AbstractActorTest {
             // add more entries after snapshot is taken
             List<ReplicatedLogEntry> entries = new ArrayList<>();
             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
-                    new MockRaftActorContext.MockPayload("F"));
+                    new MockRaftActorContext.MockPayload("F", 2));
             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
-                    new MockRaftActorContext.MockPayload("G"));
+                    new MockRaftActorContext.MockPayload("G", 3));
             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
-                    new MockRaftActorContext.MockPayload("H"));
+                    new MockRaftActorContext.MockPayload("H", 4));
             entries.add(entry2);
             entries.add(entry3);
             entries.add(entry4);
@@ -433,6 +451,7 @@ public class RaftActorTest extends AbstractActorTest {
             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
                     context.getReplicatedLog().size());
+            assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
@@ -516,7 +535,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+                verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -583,7 +602,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+                verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -810,7 +829,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
 
-                verify(mockRaftActor.delegate).createSnapshot();
+                verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -860,7 +879,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
 
-                verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
 
             }
         };
@@ -907,7 +926,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
 
-                verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
+                verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
 
                 assertTrue("The replicatedLog should have changed",
                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
@@ -1131,7 +1150,7 @@ public class RaftActorTest extends AbstractActorTest {
                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                 new MockRaftActorContext.MockPayload("x")), 4);
 
-                verify(leaderActor.delegate).createSnapshot();
+                verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
@@ -1230,7 +1249,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                                 new MockRaftActorContext.MockPayload("D")), 4);
 
-                verify(followerActor.delegate).createSnapshot();
+                verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(6, followerActor.getReplicatedLog().size());