2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.Uninterruptibles;
13 import java.util.Arrays;
14 import java.util.HashSet;
15 import java.util.List;
16 import java.util.concurrent.TimeUnit;
17 import java.util.function.Consumer;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
22 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
23 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
24 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
25 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
26 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
27 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import akka.actor.ActorRef;
31 import akka.dispatch.Dispatchers;
32 import akka.testkit.JavaTestKit;
33 import akka.testkit.TestActorRef;
36 * Unit tests for migrated messages on recovery.
38 * @author Thomas Pantelis
40 public class MigratedMessagesTest extends AbstractActorTest {
41 static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
43 private TestActorFactory factory;
47 factory = new TestActorFactory(getSystem());
51 public void tearDown() throws Exception {
53 InMemoryJournal.clear();
54 InMemorySnapshotStore.clear();
58 public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled() {
59 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled starting");
60 doTestSnapshotAfterStartupWithMigratedServerConfigPayload(true);
61 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled ending");
65 public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled() {
66 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled starting");
68 TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedServerConfigPayload(false);
69 MockRaftActor mockRaftActor = actor.underlyingActor();
70 String id = mockRaftActor.persistenceId();
71 ConfigParams config = mockRaftActor.getRaftActorContext().getConfigParams();
73 factory.killActor(actor, new JavaTestKit(getSystem()));
75 actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config).persistent(Optional.of(false)).props().
76 withDispatcher(Dispatchers.DefaultDispatcherId()), id);
77 mockRaftActor = actor.underlyingActor();
78 mockRaftActor.waitForRecoveryComplete();
80 assertEquals("electionTerm", 1,
81 mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
82 assertEquals("votedFor", id,
83 mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
85 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled ending");
89 public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled() {
90 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled starting");
92 String persistenceId = factory.generateActorId("test-actor-");
94 org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
95 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
97 InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
99 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
100 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
101 assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
104 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
108 public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled() {
109 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled starting");
111 String persistenceId = factory.generateActorId("test-actor-");
113 org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
114 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
116 InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
118 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
119 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
120 assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
123 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
127 public void testSnapshotAfterStartupWithMigratedApplyJournalEntries() {
128 TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries starting");
130 String persistenceId = factory.generateActorId("test-actor-");
132 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
133 InMemoryJournal.addEntry(persistenceId, 2, new ReplicatedLogImplEntry(0, 1,
134 new MockRaftActorContext.MockPayload("A")));
135 InMemoryJournal.addEntry(persistenceId, 3,
136 new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
139 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
140 assertEquals("getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
141 assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
142 assertEquals("getLastIndex", 0, snapshot.getLastIndex());
143 assertEquals("getLastTerm", 1, snapshot.getLastTerm());
146 TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
150 public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
151 String id = factory.generateActorId("test-actor-");
152 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
153 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
155 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
157 public void createSnapshot(ActorRef actorRef) {
158 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
162 public void applySnapshot(byte[] snapshotBytes) {
166 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id).
167 config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props().
168 withDispatcher(Dispatchers.DefaultDispatcherId()), id);
169 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
171 mockRaftActor.waitForRecoveryComplete();
173 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
175 List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
176 assertEquals("Snapshots", 0, snapshots.size());
179 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
180 String persistenceId = factory.generateActorId("test-actor-");
182 org.opendaylight.controller.cluster.raft.ServerConfigurationPayload persistedServerConfig =
183 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload(Arrays.asList(
184 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
185 persistenceId, true),
186 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
189 ServerConfigurationPayload expectedServerConfig = new ServerConfigurationPayload(Arrays.asList(
190 new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
192 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
193 InMemoryJournal.addEntry(persistenceId, 3, new ReplicatedLogImplEntry(0, 1, persistedServerConfig));
195 TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
196 persistent, snapshot -> {
197 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
198 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
199 assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
200 new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
207 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
208 Consumer<Snapshot> snapshotVerifier) {
209 InMemorySnapshotStore.addSnapshotSavedLatch(id);
210 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
211 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
212 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
214 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
216 public void createSnapshot(ActorRef actorRef) {
217 actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
221 public void applySnapshot(byte[] snapshotBytes) {
225 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id).
226 config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent)).props().
227 withDispatcher(Dispatchers.DefaultDispatcherId()), id);
228 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
230 mockRaftActor.waitForRecoveryComplete();
232 Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
233 snapshotVerifier.accept(snapshot);
235 InMemoryJournal.waitForDeleteMessagesComplete(id);
237 assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());