X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorTest.java;h=c833a86e9b825c926002ce5fc5b163438a6a4ce0;hp=ca864eb426ca48925b4abc27d0fed75e9bde7ebc;hb=d564bfe7b9b24474cc0426a859cfae8dbad8b571;hpb=6a4c3c11f68c52d00d2bc7f0b30b086113ebe859 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index ca864eb426..c833a86e9b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -21,40 +21,42 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor; +import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.Follower; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal; import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; @@ -81,31 +83,37 @@ public class RaftActorTest extends AbstractActorTest { private final DataPersistenceProvider dataPersistenceProvider; private final RaftActor delegate; + private final CountDownLatch recoveryComplete = new CountDownLatch(1); + private final List state; + private ActorRef roleChangeNotifier; public static final class MockRaftActorCreator implements Creator { + private static final long serialVersionUID = 1L; private final Map peerAddresses; private final String id; private final Optional config; private final DataPersistenceProvider dataPersistenceProvider; + private final ActorRef roleChangeNotifier; private MockRaftActorCreator(Map peerAddresses, String id, - Optional config, DataPersistenceProvider dataPersistenceProvider) { + Optional config, DataPersistenceProvider dataPersistenceProvider, + ActorRef roleChangeNotifier) { this.peerAddresses = peerAddresses; this.id = id; this.config = config; this.dataPersistenceProvider = dataPersistenceProvider; + this.roleChangeNotifier = roleChangeNotifier; } @Override public MockRaftActor create() throws Exception { - return new MockRaftActor(id, peerAddresses, config, dataPersistenceProvider); + MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config, + dataPersistenceProvider); + mockRaftActor.roleChangeNotifier = this.roleChangeNotifier; + return mockRaftActor; } } - private final CountDownLatch recoveryComplete = new CountDownLatch(1); - - private final List state; - public MockRaftActor(String id, Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); @@ -131,23 +139,24 @@ public class RaftActorTest extends AbstractActorTest { public static Props props(final String id, final Map peerAddresses, Optional config){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null)); } public static Props props(final String id, final Map peerAddresses, Optional config, DataPersistenceProvider dataPersistenceProvider){ - return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider)); + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null)); } + public static Props props(final String id, final Map peerAddresses, + Optional config, ActorRef roleChangeNotifier){ + return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier)); + } @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { delegate.applyState(clientActor, identifier, data); LOG.info("applyState called"); } - - - @Override protected void startLogRecoveryBatch(int maxBatchSize) { } @@ -174,7 +183,7 @@ public class RaftActorTest extends AbstractActorTest { Object data = toObject(snapshot); System.out.println("!!!!!applyRecoverySnapshot: "+data); if (data instanceof List) { - state.addAll((List) data); + state.addAll((List) data); } } catch (Exception e) { e.printStackTrace(); @@ -198,6 +207,11 @@ public class RaftActorTest extends AbstractActorTest { return this.dataPersistenceProvider; } + @Override + protected Optional getRoleChangeNotifier() { + return Optional.fromNullable(roleChangeNotifier); + } + @Override public String persistenceId() { return this.getId(); } @@ -235,7 +249,7 @@ public class RaftActorTest extends AbstractActorTest { super(actorSystem); raftActor = this.getSystem().actorOf(MockRaftActor.props(actorName, - Collections.EMPTY_MAP, Optional.absent()), actorName); + Collections.emptyMap(), Optional.absent()), actorName); } @@ -244,7 +258,7 @@ public class RaftActorTest extends AbstractActorTest { return raftActor; } - public boolean waitForLogMessage(final Class logEventClass, String message){ + public boolean waitForLogMessage(final Class logEventClass, String message){ // Wait for a specific log message to show up return new JavaTestKit.EventFilter(logEventClass @@ -312,7 +326,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); ActorRef followerActor = getSystem().actorOf(MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config)), persistenceId); + Collections.emptyMap(), Optional.of(config)), persistenceId); watch(followerActor); @@ -366,7 +380,7 @@ public class RaftActorTest extends AbstractActorTest { //reinstate the actor TestActorRef ref = TestActorRef.create(getSystem(), - MockRaftActor.props(persistenceId, Collections.EMPTY_MAP, + MockRaftActor.props(persistenceId, Collections.emptyMap(), Optional.of(config))); ref.underlyingActor().waitForRecoveryComplete(); @@ -399,7 +413,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config)), persistenceId); + Collections.emptyMap(), Optional.of(config)), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -468,7 +482,7 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); + Collections.emptyMap(), Optional.of(config), new DataPersistenceProviderMonitor()), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -533,7 +547,7 @@ public class RaftActorTest extends AbstractActorTest { dataPersistenceProviderMonitor.setPersistLatch(persistLatch); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -557,11 +571,10 @@ public class RaftActorTest extends AbstractActorTest { config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); - DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -590,7 +603,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -600,7 +613,6 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class)); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); } @@ -620,7 +632,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -646,8 +658,9 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); - TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProvider), persistenceId); + TestActorRef mockActorRef = TestActorRef.create(getSystem(), + MockRaftActor.props(persistenceId,Collections.emptyMap(), + Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -659,6 +672,10 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); + mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); verify(dataPersistenceProvider).saveSnapshot(anyObject()); @@ -682,7 +699,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -698,6 +715,9 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + mockRaftActor.setCurrentBehavior(new Follower(raftActorContext)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1)); verify(mockRaftActor.delegate).createSnapshot(); @@ -710,8 +730,6 @@ public class RaftActorTest extends AbstractActorTest { verify(dataPersistenceProvider).deleteMessages(100); - assertNotNull("Snapshot should not be null", mockRaftActor.getReplicatedLog().getSnapshot()); - assertEquals(2, mockRaftActor.getReplicatedLog().size()); assertNotNull(mockRaftActor.getReplicatedLog().get(3)); @@ -740,7 +758,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProvider), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProvider), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -755,8 +773,6 @@ public class RaftActorTest extends AbstractActorTest { } }; - - } @Test @@ -772,7 +788,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -780,13 +796,15 @@ public class RaftActorTest extends AbstractActorTest { oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class))); oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class))); - oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,2,mock(Payload.class))); + oldReplicatedLog.append( + new MockRaftActorContext.MockReplicatedLogEntry(1, 2, + mock(Payload.class))); ByteString snapshotBytes = fromObject(Arrays.asList( - new MockRaftActorContext.MockPayload("A"), - new MockRaftActorContext.MockPayload("B"), - new MockRaftActorContext.MockPayload("C"), - new MockRaftActorContext.MockPayload("D"))); + new MockRaftActorContext.MockPayload("A"), + new MockRaftActorContext.MockPayload("B"), + new MockRaftActorContext.MockPayload("C"), + new MockRaftActorContext.MockPayload("D"))); Snapshot snapshot = mock(Snapshot.class); @@ -798,9 +816,11 @@ public class RaftActorTest extends AbstractActorTest { verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes)); - assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); + assertTrue("The replicatedLog should have changed", + oldReplicatedLog != mockRaftActor.getReplicatedLog()); - assertEquals("lastApplied should be same as in the snapshot", (Long) 3L, mockRaftActor.getLastApplied()); + assertEquals("lastApplied should be same as in the snapshot", + (Long) 3L, mockRaftActor.getLastApplied()); assertEquals(0, mockRaftActor.getReplicatedLog().size()); @@ -823,7 +843,7 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor(); TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, - Collections.EMPTY_MAP, Optional.of(config), dataPersistenceProviderMonitor), persistenceId); + Collections.emptyMap(), Optional.of(config), dataPersistenceProviderMonitor), persistenceId); MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); @@ -833,6 +853,10 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockPayload("C"), new MockRaftActorContext.MockPayload("D"))); + RaftActorContext raftActorContext = mockRaftActor.getRaftActorContext(); + + mockRaftActor.setCurrentBehavior(new Leader(raftActorContext)); + mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1,1,-1,1)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes)); @@ -843,14 +867,46 @@ public class RaftActorTest extends AbstractActorTest { assertEquals("Snapshot index should not have advanced because save snapshot failed", -1, mockRaftActor.getReplicatedLog().getSnapshotIndex()); - assertNull("Snapshot should be null", mockRaftActor.getReplicatedLog().getSnapshot()); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); } }; } + @Test + public void testRaftRoleChangeNotifier() throws Exception { + new JavaTestKit(getSystem()) {{ + ActorRef notifierActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + DefaultConfigParamsImpl config = new DefaultConfigParamsImpl(); + String id = "testRaftRoleChangeNotifier"; + + TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(id, + Collections.emptyMap(), Optional.of(config), notifierActor), id); + + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + mockRaftActor.setCurrentBehavior(new Follower(mockRaftActor.getRaftActorContext())); + + // sleeping for a minimum of 2 seconds, if it spans more its fine. + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + + List matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class); + assertNotNull(matches); + assertEquals(2, matches.size()); + + // check if the notifier got a role change from Follower to Candidate + RoleChanged raftRoleChanged = (RoleChanged) matches.get(0); + assertEquals(id, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole()); + + // check if the notifier got a role change from Candidate to Leader + raftRoleChanged = (RoleChanged) matches.get(1); + assertEquals(id, raftRoleChanged.getMemberId()); + assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole()); + assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole()); + }}; + } + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null;