2 * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static org.junit.Assert.assertEquals;
12 import akka.actor.ActorRef;
13 import akka.dispatch.Dispatchers;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Optional;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.io.ByteSource;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.io.OutputStream;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import java.util.function.Consumer;
23 import org.junit.After;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
27 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
28 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
29 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
30 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
31 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
32 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
33 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
34 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
35 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Unit tests for migrated messages on recovery.
42 * @author Thomas Pantelis
44 public class MigratedMessagesTest extends AbstractActorTest {
45 static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
47 private TestActorFactory factory;
51 factory = new TestActorFactory(getSystem());
55 public void tearDown() {
57 InMemoryJournal.clear();
58 InMemorySnapshotStore.clear();
62 public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
63 TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
64 String id = factory.generateActorId("test-actor-");
66 InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id));
67 InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
68 InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0));
70 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
71 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
73 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
75 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
76 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
80 public void applySnapshot(Snapshot.State snapshotState) {
84 public State deserializeSnapshot(ByteSource snapshotBytes) {
85 throw new UnsupportedOperationException();
89 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
90 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props()
91 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
92 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
94 mockRaftActor.waitForRecoveryComplete();
96 Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
98 List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
99 assertEquals("Snapshots", 0, snapshots.size());
101 TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
104 @SuppressWarnings("checkstyle:IllegalCatch")
105 private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
106 Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
107 InMemorySnapshotStore.addSnapshotSavedLatch(id);
108 InMemoryJournal.addDeleteMessagesCompleteLatch(id);
109 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
110 config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
112 RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
114 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
115 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
119 public void applySnapshot(State newState) {
123 public State deserializeSnapshot(ByteSource snapshotBytes) {
124 throw new UnsupportedOperationException();
128 TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
129 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent))
130 .peerAddresses(ImmutableMap.of("peer", "")).props()
131 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
132 MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
134 mockRaftActor.waitForRecoveryComplete();
136 Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
137 snapshotVerifier.accept(snapshot);
139 InMemoryJournal.waitForDeleteMessagesComplete(id);
141 assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());