2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
12 import akka.actor.ActorRef;
13 import akka.dispatch.Dispatchers;
14 import akka.testkit.JavaTestKit;
15 import akka.testkit.TestActorRef;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.util.Arrays;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.Consumer;
24 import org.junit.After;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
28 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
29 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
30 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
31 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
32 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
33 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
34 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
35 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Unit tests for migrated messages on recovery.
42 * @author Thomas Pantelis
44 public class MigratedMessagesTest extends AbstractActorTest {
45 static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
47 private TestActorFactory factory;
51 factory = new TestActorFactory(getSystem());
55 public void tearDown() throws Exception {
57 InMemoryJournal.clear();
58 InMemorySnapshotStore.clear();
62 public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled() {
63 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled starting");
64 doTestSnapshotAfterStartupWithMigratedServerConfigPayload(true);
65 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled ending");
69 public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled() {
70 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled starting");
72 TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedServerConfigPayload(false);
73 MockRaftActor mockRaftActor = actor.underlyingActor();
74 String id = mockRaftActor.persistenceId();
75 ConfigParams config = mockRaftActor.getRaftActorContext().getConfigParams();
77 factory.killActor(actor, new JavaTestKit(getSystem()));
79 actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config)
80 .persistent(Optional.of(false)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
81 mockRaftActor = actor.underlyingActor();
82 mockRaftActor.waitForRecoveryComplete();
84 assertEquals("electionTerm", 1,
85 mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
86 assertEquals("votedFor", id,
87 mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
89 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled ending");
93 public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled() {
94 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled starting");
96 String persistenceId = factory.generateActorId("test-actor-");
98 org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
99 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
101 InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
103 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
104 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
105 assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
108 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
112 public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled() {
113 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled starting");
115 String persistenceId = factory.generateActorId("test-actor-");
117 org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
118 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
120 InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
122 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
123 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
124 assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
127 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
131 public void testSnapshotAfterStartupWithMigratedApplyJournalEntries() {
132 TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries starting");
134 String persistenceId = factory.generateActorId("test-actor-");
136 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
137 InMemoryJournal.addEntry(persistenceId, 2, new SimpleReplicatedLogEntry(0, 1,
138 new MockRaftActorContext.MockPayload("A")));
139 InMemoryJournal.addEntry(persistenceId, 3,
140 new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
143 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
144 assertEquals("getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
145 assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
146 assertEquals("getLastIndex", 0, snapshot.getLastIndex());
147 assertEquals("getLastTerm", 1, snapshot.getLastTerm());
150 TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
154 public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
155 TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
156 String id = factory.generateActorId("test-actor-");
158 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id));
159 InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
160 InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0));
162 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
163 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
165 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
167 public void createSnapshot(ActorRef actorRef) {
168 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
172 public void applySnapshot(byte[] snapshotBytes) {
176 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
177 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props()
178 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
179 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
181 mockRaftActor.waitForRecoveryComplete();
183 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
185 List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
186 assertEquals("Snapshots", 0, snapshots.size());
188 TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
192 public void testSnapshotAfterStartupWithMigratedReplicatedLogEntry() {
193 TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry starting");
195 String persistenceId = factory.generateActorId("test-actor-");
197 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
198 MockRaftActorContext.MockPayload expPayload = new MockRaftActorContext.MockPayload("A");
199 InMemoryJournal.addEntry(persistenceId, 2, new org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry(
202 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
203 assertEquals("Unapplied entries size", 1, snapshot.getUnAppliedEntries().size());
204 assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm());
205 assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex());
206 assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData());
209 TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending");
212 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
213 String persistenceId = factory.generateActorId("test-actor-");
215 org.opendaylight.controller.cluster.raft.ServerConfigurationPayload persistedServerConfig =
216 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload(Arrays.asList(
217 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
218 persistenceId, true),
219 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
222 ServerConfigurationPayload expectedServerConfig = new ServerConfigurationPayload(Arrays.asList(
223 new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
225 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
226 InMemoryJournal.addEntry(persistenceId, 3, new SimpleReplicatedLogEntry(0, 1, persistedServerConfig));
228 TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
229 persistent, snapshot -> {
230 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
231 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
232 assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
233 new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
239 @SuppressWarnings("checkstyle:IllegalCatch")
240 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
241 Consumer<Snapshot> snapshotVerifier) {
242 InMemorySnapshotStore.addSnapshotSavedLatch(id);
243 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
244 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
245 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
247 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
249 public void createSnapshot(ActorRef actorRef) {
250 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
254 public void applySnapshot(byte[] snapshotBytes) {
258 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
259 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent))
260 .peerAddresses(ImmutableMap.of("peer", "")).props()
261 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
262 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
264 mockRaftActor.waitForRecoveryComplete();
266 Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
267 snapshotVerifier.accept(snapshot);
269 InMemoryJournal.waitForDeleteMessagesComplete(id);
271 assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());