2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
12 import akka.actor.ActorRef;
13 import akka.dispatch.Dispatchers;
14 import akka.testkit.JavaTestKit;
15 import akka.testkit.TestActorRef;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.util.Arrays;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import java.util.function.Consumer;
23 import org.junit.After;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
27 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
28 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
29 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
30 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
31 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
32 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Unit tests for migrated messages on recovery.
39 * @author Thomas Pantelis
41 public class MigratedMessagesTest extends AbstractActorTest {
42 static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
44 private TestActorFactory factory;
48 factory = new TestActorFactory(getSystem());
52 public void tearDown() throws Exception {
54 InMemoryJournal.clear();
55 InMemorySnapshotStore.clear();
59 public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled() {
60 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled starting");
61 doTestSnapshotAfterStartupWithMigratedServerConfigPayload(true);
62 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled ending");
66 public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled() {
67 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled starting");
69 TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedServerConfigPayload(false);
70 MockRaftActor mockRaftActor = actor.underlyingActor();
71 String id = mockRaftActor.persistenceId();
72 ConfigParams config = mockRaftActor.getRaftActorContext().getConfigParams();
74 factory.killActor(actor, new JavaTestKit(getSystem()));
76 actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config)
77 .persistent(Optional.of(false)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
78 mockRaftActor = actor.underlyingActor();
79 mockRaftActor.waitForRecoveryComplete();
81 assertEquals("electionTerm", 1,
82 mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
83 assertEquals("votedFor", id,
84 mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
86 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled ending");
90 public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled() {
91 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled starting");
93 String persistenceId = factory.generateActorId("test-actor-");
95 org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
96 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
98 InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
100 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
101 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
102 assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
105 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
109 public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled() {
110 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled starting");
112 String persistenceId = factory.generateActorId("test-actor-");
114 org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
115 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
117 InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
119 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
120 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
121 assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
124 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
128 public void testSnapshotAfterStartupWithMigratedApplyJournalEntries() {
129 TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries starting");
131 String persistenceId = factory.generateActorId("test-actor-");
133 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
134 InMemoryJournal.addEntry(persistenceId, 2, new ReplicatedLogImplEntry(0, 1,
135 new MockRaftActorContext.MockPayload("A")));
136 InMemoryJournal.addEntry(persistenceId, 3,
137 new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
140 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
141 assertEquals("getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
142 assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
143 assertEquals("getLastIndex", 0, snapshot.getLastIndex());
144 assertEquals("getLastTerm", 1, snapshot.getLastTerm());
147 TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
151 public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
152 String id = factory.generateActorId("test-actor-");
153 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
154 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
156 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
158 public void createSnapshot(ActorRef actorRef) {
159 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
163 public void applySnapshot(byte[] snapshotBytes) {
167 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
168 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props()
169 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
170 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
172 mockRaftActor.waitForRecoveryComplete();
174 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
176 List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
177 assertEquals("Snapshots", 0, snapshots.size());
180 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
181 String persistenceId = factory.generateActorId("test-actor-");
183 org.opendaylight.controller.cluster.raft.ServerConfigurationPayload persistedServerConfig =
184 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload(Arrays.asList(
185 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
186 persistenceId, true),
187 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
190 ServerConfigurationPayload expectedServerConfig = new ServerConfigurationPayload(Arrays.asList(
191 new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
193 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
194 InMemoryJournal.addEntry(persistenceId, 3, new ReplicatedLogImplEntry(0, 1, persistedServerConfig));
196 TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
197 persistent, snapshot -> {
198 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
199 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
200 assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
201 new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
207 @SuppressWarnings("checkstyle:IllegalCatch")
208 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
209 Consumer<Snapshot> snapshotVerifier) {
210 InMemorySnapshotStore.addSnapshotSavedLatch(id);
211 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
212 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
213 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
215 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
217 public void createSnapshot(ActorRef actorRef) {
218 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
222 public void applySnapshot(byte[] snapshotBytes) {
226 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
227 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props()
228 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
229 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
231 mockRaftActor.waitForRecoveryComplete();
233 Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
234 snapshotVerifier.accept(snapshot);
236 InMemoryJournal.waitForDeleteMessagesComplete(id);
238 assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());