Remove deprecated persisted raft payloads
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MigratedMessagesTest.java
1 /*
2  * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11
12 import akka.actor.ActorRef;
13 import akka.dispatch.Dispatchers;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Optional;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.collect.Sets;
18 import com.google.common.io.ByteSource;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.io.OutputStream;
21 import java.io.Serializable;
22 import java.util.Arrays;
23 import java.util.List;
24 import java.util.concurrent.TimeUnit;
25 import java.util.function.Consumer;
26 import org.apache.commons.lang3.SerializationUtils;
27 import org.junit.After;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
31 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
33 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
34 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
38 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
39 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
40 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
41 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
42 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
43 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Unit tests for migrated messages on recovery.
49  *
50  * @author Thomas Pantelis
51  */
52 public class MigratedMessagesTest extends AbstractActorTest {
53     static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
54
55     private TestActorFactory factory;
56
57     @Before
58     public void setUp() {
59         factory = new TestActorFactory(getSystem());
60     }
61
62     @After
63     public void tearDown() throws Exception {
64         factory.close();
65         InMemoryJournal.clear();
66         InMemorySnapshotStore.clear();
67     }
68
69     @Test
70     public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
71         TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
72         String id = factory.generateActorId("test-actor-");
73
74         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id));
75         InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
76         InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0));
77
78         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
79         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
80
81         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
82             @Override
83             public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
84                 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
85             }
86
87             @Override
88             public void applySnapshot(Snapshot.State snapshotState) {
89             }
90
91             @Override
92             public State deserializeSnapshot(ByteSource snapshotBytes) {
93                 throw new UnsupportedOperationException();
94             }
95         };
96
97         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
98                 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props()
99                     .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
100         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
101
102         mockRaftActor.waitForRecoveryComplete();
103
104         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
105
106         List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
107         assertEquals("Snapshots", 0, snapshots.size());
108
109         TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
110     }
111
112     @Test
113     public void testSnapshotAfterStartupWithMigratedSnapshot() throws Exception {
114         TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot starting");
115
116         String persistenceId = factory.generateActorId("test-actor-");
117
118         List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
119         final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
120
121         org.opendaylight.controller.cluster.raft.Snapshot legacy = org.opendaylight.controller.cluster.raft.Snapshot
122             .create(SerializationUtils.serialize((Serializable) snapshotData),
123                 Arrays.asList(new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))),
124                 6, 2, 5, 1, 3, "member-1", new ServerConfigurationPayload(Arrays.asList(
125                         new ServerInfo(persistenceId, true), new ServerInfo("2", false))));
126         InMemorySnapshotStore.addSnapshot(persistenceId, legacy);
127
128         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
129             assertEquals("getLastIndex", legacy.getLastIndex(), snapshot.getLastIndex());
130             assertEquals("getLastTerm", legacy.getLastTerm(), snapshot.getLastTerm());
131             assertEquals("getLastAppliedIndex", legacy.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
132             assertEquals("getLastAppliedTerm", legacy.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
133             assertEquals("getState", snapshotState, snapshot.getState());
134             assertEquals("Unapplied entries size", legacy.getUnAppliedEntries().size(),
135                     snapshot.getUnAppliedEntries().size());
136             assertEquals("Unapplied entry term", legacy.getUnAppliedEntries().get(0).getTerm(),
137                     snapshot.getUnAppliedEntries().get(0).getTerm());
138             assertEquals("Unapplied entry index", legacy.getUnAppliedEntries().get(0).getIndex(),
139                     snapshot.getUnAppliedEntries().get(0).getIndex());
140             assertEquals("Unapplied entry data", legacy.getUnAppliedEntries().get(0).getData(),
141                     snapshot.getUnAppliedEntries().get(0).getData());
142             assertEquals("getElectionVotedFor", legacy.getElectionVotedFor(), snapshot.getElectionVotedFor());
143             assertEquals("getElectionTerm", legacy.getElectionTerm(), snapshot.getElectionTerm());
144             assertEquals("getServerConfiguration", Sets.newHashSet(legacy.getServerConfiguration().getServerConfig()),
145                     Sets.newHashSet(snapshot.getServerConfiguration().getServerConfig()));
146         }, snapshotState);
147
148         TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot ending");
149     }
150
151     @SuppressWarnings("checkstyle:IllegalCatch")
152     private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
153             Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
154         InMemorySnapshotStore.addSnapshotSavedLatch(id);
155         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
156         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
157         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
158
159         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
160             @Override
161             public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
162                 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
163             }
164
165             @Override
166             public void applySnapshot(State newState) {
167             }
168
169             @Override
170             public State deserializeSnapshot(ByteSource snapshotBytes) {
171                 throw new UnsupportedOperationException();
172             }
173         };
174
175         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
176                 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent))
177                 .peerAddresses(ImmutableMap.of("peer", "")).props()
178                     .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
179         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
180
181         mockRaftActor.waitForRecoveryComplete();
182
183         Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
184         snapshotVerifier.accept(snapshot);
185
186         InMemoryJournal.waitForDeleteMessagesComplete(id);
187
188         assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());
189
190         return raftActorRef;
191     }
192
193 }