2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
12 import akka.actor.ActorRef;
13 import akka.dispatch.Dispatchers;
14 import akka.testkit.JavaTestKit;
15 import akka.testkit.TestActorRef;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.collect.Sets;
19 import com.google.common.io.ByteSource;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.io.OutputStream;
22 import java.io.Serializable;
23 import java.util.Arrays;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.Consumer;
28 import org.apache.commons.lang3.SerializationUtils;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
34 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
35 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
36 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
37 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
38 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
41 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
42 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
43 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
44 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
45 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Unit tests for migrated messages on recovery.
52 * @author Thomas Pantelis
54 public class MigratedMessagesTest extends AbstractActorTest {
55 static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
57 private TestActorFactory factory;
61 factory = new TestActorFactory(getSystem());
65 public void tearDown() throws Exception {
67 InMemoryJournal.clear();
68 InMemorySnapshotStore.clear();
72 public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled() {
73 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled starting");
74 doTestSnapshotAfterStartupWithMigratedServerConfigPayload(true);
75 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled ending");
79 public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled() {
80 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled starting");
82 TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedServerConfigPayload(false);
83 MockRaftActor mockRaftActor = actor.underlyingActor();
84 String id = mockRaftActor.persistenceId();
85 ConfigParams config = mockRaftActor.getRaftActorContext().getConfigParams();
87 factory.killActor(actor, new JavaTestKit(getSystem()));
89 actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config)
90 .persistent(Optional.of(false)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
91 mockRaftActor = actor.underlyingActor();
92 mockRaftActor.waitForRecoveryComplete();
94 assertEquals("electionTerm", 1,
95 mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
96 assertEquals("votedFor", id,
97 mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
99 TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled ending");
103 public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled() {
104 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled starting");
106 String persistenceId = factory.generateActorId("test-actor-");
108 org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
109 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
111 InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
113 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
114 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
115 assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
116 }, ByteState.empty());
118 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
122 public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled() {
123 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled starting");
125 String persistenceId = factory.generateActorId("test-actor-");
127 org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
128 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
130 InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
132 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
133 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
134 assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
135 }, ByteState.empty());
137 TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
141 public void testSnapshotAfterStartupWithMigratedApplyJournalEntries() {
142 TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries starting");
144 String persistenceId = factory.generateActorId("test-actor-");
146 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
147 InMemoryJournal.addEntry(persistenceId, 2, new SimpleReplicatedLogEntry(0, 1,
148 new MockRaftActorContext.MockPayload("A")));
149 InMemoryJournal.addEntry(persistenceId, 3,
150 new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
153 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
154 assertEquals("getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
155 assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
156 assertEquals("getLastIndex", 0, snapshot.getLastIndex());
157 assertEquals("getLastTerm", 1, snapshot.getLastTerm());
158 }, ByteState.empty());
160 TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
164 public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
165 TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
166 String id = factory.generateActorId("test-actor-");
168 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id));
169 InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
170 InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0));
172 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
173 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
175 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
177 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
178 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
182 public void applySnapshot(Snapshot.State snapshotState) {
186 public State deserializeSnapshot(ByteSource snapshotBytes) {
187 throw new UnsupportedOperationException();
191 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
192 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props()
193 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
194 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
196 mockRaftActor.waitForRecoveryComplete();
198 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
200 List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
201 assertEquals("Snapshots", 0, snapshots.size());
203 TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
207 public void testSnapshotAfterStartupWithMigratedReplicatedLogEntry() {
208 TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry starting");
210 String persistenceId = factory.generateActorId("test-actor-");
212 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
213 MockRaftActorContext.MockPayload expPayload = new MockRaftActorContext.MockPayload("A");
214 InMemoryJournal.addEntry(persistenceId, 2, new org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry(
217 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
218 assertEquals("Unapplied entries size", 1, snapshot.getUnAppliedEntries().size());
219 assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm());
220 assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex());
221 assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData());
222 }, ByteState.empty());
224 TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending");
227 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
228 String persistenceId = factory.generateActorId("test-actor-");
230 org.opendaylight.controller.cluster.raft.ServerConfigurationPayload persistedServerConfig =
231 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload(Arrays.asList(
232 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
233 persistenceId, true),
234 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
237 ServerConfigurationPayload expectedServerConfig = new ServerConfigurationPayload(Arrays.asList(
238 new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
240 InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
241 InMemoryJournal.addEntry(persistenceId, 3, new SimpleReplicatedLogEntry(0, 1, persistedServerConfig));
243 TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
244 persistent, snapshot -> {
245 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
246 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
247 assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
248 new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
249 }, ByteState.empty());
255 public void testSnapshotAfterStartupWithMigratedSnapshot() throws Exception {
256 TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot starting");
258 String persistenceId = factory.generateActorId("test-actor-");
260 List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
261 final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
263 org.opendaylight.controller.cluster.raft.Snapshot legacy = org.opendaylight.controller.cluster.raft.Snapshot
264 .create(SerializationUtils.serialize((Serializable) snapshotData),
265 Arrays.asList(new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))),
266 6, 2, 5, 1, 3, "member-1", new ServerConfigurationPayload(Arrays.asList(
267 new ServerInfo(persistenceId, true), new ServerInfo("2", false))));
268 InMemorySnapshotStore.addSnapshot(persistenceId, legacy);
270 doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
271 assertEquals("getLastIndex", legacy.getLastIndex(), snapshot.getLastIndex());
272 assertEquals("getLastTerm", legacy.getLastTerm(), snapshot.getLastTerm());
273 assertEquals("getLastAppliedIndex", legacy.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
274 assertEquals("getLastAppliedTerm", legacy.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
275 assertEquals("getState", snapshotState, snapshot.getState());
276 assertEquals("Unapplied entries size", legacy.getUnAppliedEntries().size(),
277 snapshot.getUnAppliedEntries().size());
278 assertEquals("Unapplied entry term", legacy.getUnAppliedEntries().get(0).getTerm(),
279 snapshot.getUnAppliedEntries().get(0).getTerm());
280 assertEquals("Unapplied entry index", legacy.getUnAppliedEntries().get(0).getIndex(),
281 snapshot.getUnAppliedEntries().get(0).getIndex());
282 assertEquals("Unapplied entry data", legacy.getUnAppliedEntries().get(0).getData(),
283 snapshot.getUnAppliedEntries().get(0).getData());
284 assertEquals("getElectionVotedFor", legacy.getElectionVotedFor(), snapshot.getElectionVotedFor());
285 assertEquals("getElectionTerm", legacy.getElectionTerm(), snapshot.getElectionTerm());
286 assertEquals("getServerConfiguration", Sets.newHashSet(legacy.getServerConfiguration().getServerConfig()),
287 Sets.newHashSet(snapshot.getServerConfiguration().getServerConfig()));
290 TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot ending");
293 @SuppressWarnings("checkstyle:IllegalCatch")
294 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
295 Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
296 InMemorySnapshotStore.addSnapshotSavedLatch(id);
297 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
298 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
299 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
301 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
303 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
304 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
308 public void applySnapshot(State newState) {
312 public State deserializeSnapshot(ByteSource snapshotBytes) {
313 throw new UnsupportedOperationException();
317 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
318 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent))
319 .peerAddresses(ImmutableMap.of("peer", "")).props()
320 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
321 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
323 mockRaftActor.waitForRecoveryComplete();
325 Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
326 snapshotVerifier.accept(snapshot);
328 InMemoryJournal.waitForDeleteMessagesComplete(id);
330 assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());