460dd4a445a8306731499e205b1207c563e1e110
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MigratedMessagesTest.java
1 /*
2  * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11
12 import akka.actor.ActorRef;
13 import akka.dispatch.Dispatchers;
14 import akka.testkit.JavaTestKit;
15 import akka.testkit.TestActorRef;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.collect.Sets;
19 import com.google.common.io.ByteSource;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.io.OutputStream;
22 import java.io.Serializable;
23 import java.util.Arrays;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.Consumer;
28 import org.apache.commons.lang3.SerializationUtils;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
33 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
34 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
35 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
36 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
37 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
38 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
39 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
41 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
42 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
43 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
44 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
45 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Unit tests for migrated messages on recovery.
51  *
52  * @author Thomas Pantelis
53  */
54 public class MigratedMessagesTest extends AbstractActorTest {
55     static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
56
57     private TestActorFactory factory;
58
59     @Before
60     public void setUp() {
61         factory = new TestActorFactory(getSystem());
62     }
63
64     @After
65     public void tearDown() throws Exception {
66         factory.close();
67         InMemoryJournal.clear();
68         InMemorySnapshotStore.clear();
69     }
70
71     @Test
72     public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled() {
73         TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled starting");
74         doTestSnapshotAfterStartupWithMigratedServerConfigPayload(true);
75         TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceEnabled ending");
76     }
77
78     @Test
79     public void testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled() {
80         TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled starting");
81
82         TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedServerConfigPayload(false);
83         MockRaftActor mockRaftActor = actor.underlyingActor();
84         String id = mockRaftActor.persistenceId();
85         ConfigParams config = mockRaftActor.getRaftActorContext().getConfigParams();
86
87         factory.killActor(actor, new JavaTestKit(getSystem()));
88
89         actor = factory.createTestActor(MockRaftActor.builder().id(id).config(config)
90                 .persistent(Optional.of(false)).props().withDispatcher(Dispatchers.DefaultDispatcherId()), id);
91         mockRaftActor = actor.underlyingActor();
92         mockRaftActor.waitForRecoveryComplete();
93
94         assertEquals("electionTerm", 1,
95                 mockRaftActor.getRaftActorContext().getTermInformation().getCurrentTerm());
96         assertEquals("votedFor", id,
97                 mockRaftActor.getRaftActorContext().getTermInformation().getVotedFor());
98
99         TEST_LOG.info("testSnapshotAfterStartupWithMigratedServerConfigPayloadAndPersistenceDisabled ending");
100     }
101
102     @Test
103     public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled() {
104         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled starting");
105
106         String persistenceId = factory.generateActorId("test-actor-");
107
108         org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
109                 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
110
111         InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
112
113         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
114             assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
115             assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
116         }, ByteState.empty());
117
118         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceEnabled ending");
119     }
120
121     @Test
122     public void testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled() {
123         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled starting");
124
125         String persistenceId = factory.generateActorId("test-actor-");
126
127         org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm updateElectionTerm =
128                 new org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm(5, persistenceId);
129
130         InMemoryJournal.addEntry(persistenceId, 1, updateElectionTerm);
131
132         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, false, snapshot -> {
133             assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
134             assertEquals("getElectionTerm", 5, snapshot.getElectionTerm());
135         }, ByteState.empty());
136
137         TEST_LOG.info("testSnapshotAfterStartupWithMigratedUpdateElectionTermAndPersistenceDisabled ending");
138     }
139
140     @Test
141     public void testSnapshotAfterStartupWithMigratedApplyJournalEntries() {
142         TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries starting");
143
144         String persistenceId = factory.generateActorId("test-actor-");
145
146         InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
147         InMemoryJournal.addEntry(persistenceId, 2, new SimpleReplicatedLogEntry(0, 1,
148                 new MockRaftActorContext.MockPayload("A")));
149         InMemoryJournal.addEntry(persistenceId, 3,
150                 new org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries(0));
151
152
153         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
154             assertEquals("getLastAppliedIndex", 0, snapshot.getLastAppliedIndex());
155             assertEquals("getLastAppliedTerm", 1, snapshot.getLastAppliedTerm());
156             assertEquals("getLastIndex", 0, snapshot.getLastIndex());
157             assertEquals("getLastTerm", 1, snapshot.getLastTerm());
158         }, ByteState.empty());
159
160         TEST_LOG.info("testSnapshotAfterStartupWithMigratedApplyJournalEntries ending");
161     }
162
163     @Test
164     public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
165         TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
166         String id = factory.generateActorId("test-actor-");
167
168         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id));
169         InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
170         InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0));
171
172         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
173         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
174
175         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
176             @Override
177             public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
178                 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
179             }
180
181             @Override
182             public void applySnapshot(Snapshot.State snapshotState) {
183             }
184
185             @Override
186             public State deserializeSnapshot(ByteSource snapshotBytes) {
187                 throw new UnsupportedOperationException();
188             }
189         };
190
191         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
192                 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props()
193                     .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
194         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
195
196         mockRaftActor.waitForRecoveryComplete();
197
198         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
199
200         List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
201         assertEquals("Snapshots", 0, snapshots.size());
202
203         TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
204     }
205
206     @Test
207     public void testSnapshotAfterStartupWithMigratedReplicatedLogEntry() {
208         TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry starting");
209
210         String persistenceId = factory.generateActorId("test-actor-");
211
212         InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
213         MockRaftActorContext.MockPayload expPayload = new MockRaftActorContext.MockPayload("A");
214         InMemoryJournal.addEntry(persistenceId, 2, new org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry(
215                 0, 1, expPayload));
216
217         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
218             assertEquals("Unapplied entries size", 1, snapshot.getUnAppliedEntries().size());
219             assertEquals("Unapplied entry term", 1, snapshot.getUnAppliedEntries().get(0).getTerm());
220             assertEquals("Unapplied entry index", 0, snapshot.getUnAppliedEntries().get(0).getIndex());
221             assertEquals("Unapplied entry data", expPayload, snapshot.getUnAppliedEntries().get(0).getData());
222         }, ByteState.empty());
223
224         TEST_LOG.info("testSnapshotAfterStartupWithMigratedReplicatedLogEntry ending");
225     }
226
227     private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedServerConfigPayload(boolean persistent) {
228         String persistenceId = factory.generateActorId("test-actor-");
229
230         org.opendaylight.controller.cluster.raft.ServerConfigurationPayload persistedServerConfig =
231                 new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload(Arrays.asList(
232                     new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
233                             persistenceId, true),
234                     new org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo(
235                             "downNode", true)));
236
237         ServerConfigurationPayload expectedServerConfig = new ServerConfigurationPayload(Arrays.asList(
238                 new ServerInfo(persistenceId, true), new ServerInfo("downNode", true)));
239
240         InMemoryJournal.addEntry(persistenceId, 1, new UpdateElectionTerm(1, persistenceId));
241         InMemoryJournal.addEntry(persistenceId, 3, new SimpleReplicatedLogEntry(0, 1, persistedServerConfig));
242
243         TestActorRef<MockRaftActor> actor = doTestSnapshotAfterStartupWithMigratedMessage(persistenceId,
244             persistent, snapshot -> {
245                 assertEquals("getElectionVotedFor", persistenceId, snapshot.getElectionVotedFor());
246                 assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
247                 assertEquals("getServerConfiguration", new HashSet<>(expectedServerConfig.getServerConfig()),
248                         new HashSet<>(snapshot.getServerConfiguration().getServerConfig()));
249             }, ByteState.empty());
250
251         return actor;
252     }
253
254     @Test
255     public void testSnapshotAfterStartupWithMigratedSnapshot() throws Exception {
256         TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot starting");
257
258         String persistenceId = factory.generateActorId("test-actor-");
259
260         List<Object> snapshotData = Arrays.asList(new MockPayload("1"));
261         final MockSnapshotState snapshotState = new MockSnapshotState(snapshotData);
262
263         org.opendaylight.controller.cluster.raft.Snapshot legacy = org.opendaylight.controller.cluster.raft.Snapshot
264             .create(SerializationUtils.serialize((Serializable) snapshotData),
265                 Arrays.asList(new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))),
266                 6, 2, 5, 1, 3, "member-1", new ServerConfigurationPayload(Arrays.asList(
267                         new ServerInfo(persistenceId, true), new ServerInfo("2", false))));
268         InMemorySnapshotStore.addSnapshot(persistenceId, legacy);
269
270         doTestSnapshotAfterStartupWithMigratedMessage(persistenceId, true, snapshot -> {
271             assertEquals("getLastIndex", legacy.getLastIndex(), snapshot.getLastIndex());
272             assertEquals("getLastTerm", legacy.getLastTerm(), snapshot.getLastTerm());
273             assertEquals("getLastAppliedIndex", legacy.getLastAppliedIndex(), snapshot.getLastAppliedIndex());
274             assertEquals("getLastAppliedTerm", legacy.getLastAppliedTerm(), snapshot.getLastAppliedTerm());
275             assertEquals("getState", snapshotState, snapshot.getState());
276             assertEquals("Unapplied entries size", legacy.getUnAppliedEntries().size(),
277                     snapshot.getUnAppliedEntries().size());
278             assertEquals("Unapplied entry term", legacy.getUnAppliedEntries().get(0).getTerm(),
279                     snapshot.getUnAppliedEntries().get(0).getTerm());
280             assertEquals("Unapplied entry index", legacy.getUnAppliedEntries().get(0).getIndex(),
281                     snapshot.getUnAppliedEntries().get(0).getIndex());
282             assertEquals("Unapplied entry data", legacy.getUnAppliedEntries().get(0).getData(),
283                     snapshot.getUnAppliedEntries().get(0).getData());
284             assertEquals("getElectionVotedFor", legacy.getElectionVotedFor(), snapshot.getElectionVotedFor());
285             assertEquals("getElectionTerm", legacy.getElectionTerm(), snapshot.getElectionTerm());
286             assertEquals("getServerConfiguration", Sets.newHashSet(legacy.getServerConfiguration().getServerConfig()),
287                     Sets.newHashSet(snapshot.getServerConfiguration().getServerConfig()));
288         }, snapshotState);
289
290         TEST_LOG.info("testSnapshotAfterStartupWithMigratedSnapshot ending");
291     }
292
293     @SuppressWarnings("checkstyle:IllegalCatch")
294     private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
295             Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
296         InMemorySnapshotStore.addSnapshotSavedLatch(id);
297         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
298         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
299         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
300
301         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
302             @Override
303             public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
304                 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
305             }
306
307             @Override
308             public void applySnapshot(State newState) {
309             }
310
311             @Override
312             public State deserializeSnapshot(ByteSource snapshotBytes) {
313                 throw new UnsupportedOperationException();
314             }
315         };
316
317         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
318                 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent))
319                 .peerAddresses(ImmutableMap.of("peer", "")).props()
320                     .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
321         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
322
323         mockRaftActor.waitForRecoveryComplete();
324
325         Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
326         snapshotVerifier.accept(snapshot);
327
328         InMemoryJournal.waitForDeleteMessagesComplete(id);
329
330         assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());
331
332         return raftActorRef;
333     }
334
335 }