Migrate most of CDS to use java.util.Optional
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MigratedMessagesTest.java
1 /*
2  * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static org.junit.Assert.assertEquals;
11
12 import akka.actor.ActorRef;
13 import akka.dispatch.Dispatchers;
14 import akka.testkit.TestActorRef;
15 import com.google.common.base.Optional;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.io.ByteSource;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.io.OutputStream;
20 import java.util.List;
21 import java.util.concurrent.TimeUnit;
22 import java.util.function.Consumer;
23 import org.junit.After;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
27 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
28 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
29 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
30 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
31 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
32 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
33 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
34 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
35 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Unit tests for migrated messages on recovery.
41  *
42  * @author Thomas Pantelis
43  */
44 public class MigratedMessagesTest extends AbstractActorTest {
45     static final Logger TEST_LOG = LoggerFactory.getLogger(MigratedMessagesTest.class);
46
47     private TestActorFactory factory;
48
49     @Before
50     public void setUp() {
51         factory = new TestActorFactory(getSystem());
52     }
53
54     @After
55     public void tearDown() {
56         factory.close();
57         InMemoryJournal.clear();
58         InMemorySnapshotStore.clear();
59     }
60
61     @Test
62     public void testNoSnapshotAfterStartupWithNoMigratedMessages() {
63         TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages starting");
64         String id = factory.generateActorId("test-actor-");
65
66         InMemoryJournal.addEntry(id, 1, new UpdateElectionTerm(1, id));
67         InMemoryJournal.addEntry(id, 2, new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
68         InMemoryJournal.addEntry(id, 3, new ApplyJournalEntries(0));
69
70         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
71         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
72
73         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
74             @Override
75             public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
76                 actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
77             }
78
79             @Override
80             public void applySnapshot(Snapshot.State snapshotState) {
81             }
82
83             @Override
84             public State deserializeSnapshot(ByteSource snapshotBytes) {
85                 throw new UnsupportedOperationException();
86             }
87         };
88
89         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
90                 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(true)).props()
91                     .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
92         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
93
94         mockRaftActor.waitForRecoveryComplete();
95
96         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
97
98         List<Snapshot> snapshots = InMemorySnapshotStore.getSnapshots(id, Snapshot.class);
99         assertEquals("Snapshots", 0, snapshots.size());
100
101         TEST_LOG.info("testNoSnapshotAfterStartupWithNoMigratedMessages ending");
102     }
103
104     @SuppressWarnings("checkstyle:IllegalCatch")
105     private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
106             Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
107         InMemorySnapshotStore.addSnapshotSavedLatch(id);
108         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
109         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
110         config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
111
112         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
113             @Override
114             public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
115                 actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
116             }
117
118             @Override
119             public void applySnapshot(State newState) {
120             }
121
122             @Override
123             public State deserializeSnapshot(ByteSource snapshotBytes) {
124                 throw new UnsupportedOperationException();
125             }
126         };
127
128         TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(id)
129                 .config(config).snapshotCohort(snapshotCohort).persistent(Optional.of(persistent))
130                 .peerAddresses(ImmutableMap.of("peer", "")).props()
131                     .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
132         MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
133
134         mockRaftActor.waitForRecoveryComplete();
135
136         Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
137         snapshotVerifier.accept(snapshot);
138
139         InMemoryJournal.waitForDeleteMessagesComplete(id);
140
141         assertEquals("InMemoryJournal size", 0, InMemoryJournal.get(id).size());
142
143         return raftActorRef;
144     }
145
146 }