Fix segmented journal replay
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / test / java / org / opendaylight / controller / akka / segjournal / SegmentedFileJournalTest.java
1 /*
2  * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.akka.segjournal;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.actor.PoisonPill;
23 import akka.persistence.AtomicWrite;
24 import akka.persistence.PersistentRepr;
25 import akka.testkit.CallingThreadDispatcher;
26 import akka.testkit.javadsl.TestKit;
27 import io.atomix.storage.StorageLevel;
28 import java.io.File;
29 import java.io.IOException;
30 import java.io.Serializable;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Optional;
36 import java.util.function.Consumer;
37 import java.util.stream.Collectors;
38 import org.apache.commons.io.FileUtils;
39 import org.junit.After;
40 import org.junit.AfterClass;
41 import org.junit.Before;
42 import org.junit.BeforeClass;
43 import org.junit.Test;
44 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
45 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
46 import scala.concurrent.Future;
47
48 public class SegmentedFileJournalTest {
49     private static final File DIRECTORY = new File("target/sfj-test");
50     private static final int SEGMENT_SIZE = 1024 * 1024;
51     private static final int MESSAGE_SIZE = 512 * 1024;
52
53     private static ActorSystem SYSTEM;
54
55     private TestKit kit;
56     private ActorRef actor;
57
58     @BeforeClass
59     public static void beforeClass() {
60         SYSTEM = ActorSystem.create("test");
61     }
62
63     @AfterClass
64     public static void afterClass() {
65         TestKit.shutdownActorSystem(SYSTEM);
66         SYSTEM = null;
67     }
68
69     @Before
70     public void before() {
71         kit = new TestKit(SYSTEM);
72         FileUtils.deleteQuietly(DIRECTORY);
73         actor = actor();
74     }
75
76     @After
77     public void after() {
78         actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
79     }
80
81     @Test
82     public void testDeleteAfterStop() {
83         // Preliminary setup
84         final WriteMessages write = new WriteMessages();
85         final Future<Optional<Exception>> first = write.add(AtomicWrite.apply(PersistentRepr.apply("first", 1, "foo",
86             null, false, kit.getRef(), "uuid")));
87         final Future<Optional<Exception>> second = write.add(AtomicWrite.apply(PersistentRepr.apply("second", 2, "foo",
88             null, false, kit.getRef(), "uuid")));
89         actor.tell(write, ActorRef.noSender());
90         assertFalse(getFuture(first).isPresent());
91         assertFalse(getFuture(second).isPresent());
92
93         assertHighestSequenceNr(2);
94         assertReplayCount(2);
95
96         deleteEntries(1);
97
98         assertHighestSequenceNr(2);
99         assertReplayCount(1);
100
101         // Restart actor
102         actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
103         actor = actor();
104
105         // Check if state is retained
106         assertHighestSequenceNr(2);
107         assertReplayCount(1);
108     }
109
110     @Test
111     public void testSegmentation() throws IOException {
112         // We want to have roughly three segments
113         final LargePayload payload = new LargePayload();
114
115         final WriteMessages write = new WriteMessages();
116         final List<Future<Optional<Exception>>> requests = new ArrayList<>();
117
118         // Each payload is half of segment size, plus some overhead, should result in two segments being present
119         for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
120             requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
121                 "uuid"))));
122         }
123
124         actor.tell(write, ActorRef.noSender());
125         requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
126
127         assertFileCount(2, 1);
128
129         // Delete all but the last entry
130         deleteEntries(requests.size());
131
132         assertFileCount(1, 1);
133     }
134
135     @Test
136     public void testComplexDeletesAndPartialReplays() throws Exception {
137         for (int i = 0; i <= 4; i++) {
138             writeBigPaylod();
139         }
140
141         assertFileCount(10, 1);
142
143         // delete including index 3, so get rid of the first segment
144         deleteEntries(3);
145         assertFileCount(9, 1);
146
147         // get rid of segments 2(index 4-6) and 3(index 7-9)
148         deleteEntries(9);
149         assertFileCount(7, 1);
150
151         // get rid of all segments except the last one
152         deleteEntries(27);
153         assertFileCount(1, 1);
154
155         restartActor();
156
157         // Check if state is retained
158         assertHighestSequenceNr(30);
159         // 28,29,30 replayed
160         assertReplayCount(3);
161
162
163         deleteEntries(28);
164         restartActor();
165
166         assertHighestSequenceNr(30);
167         // 29,30 replayed
168         assertReplayCount(2);
169
170         deleteEntries(29);
171         restartActor();
172
173         // 30 replayed
174         assertReplayCount(1);
175
176         deleteEntries(30);
177         restartActor();
178
179         // nothing replayed
180         assertReplayCount(0);
181     }
182
183     private void restartActor() {
184         actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
185         actor = actor();
186     }
187
188     private void writeBigPaylod() {
189         final LargePayload payload = new LargePayload();
190
191         final WriteMessages write = new WriteMessages();
192         final List<Future<Optional<Exception>>> requests = new ArrayList<>();
193
194         // Each payload is half of segment size, plus some overhead, should result in two segments being present
195         for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
196             requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
197                     "uuid"))));
198         }
199
200         actor.tell(write, ActorRef.noSender());
201         requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
202     }
203
204     private ActorRef actor() {
205         return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
206             SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));
207     }
208
209     private void deleteEntries(final long deleteTo) {
210         final AsyncMessage<Void> delete = SegmentedJournalActor.deleteMessagesTo(deleteTo);
211         actor.tell(delete, ActorRef.noSender());
212         assertNull(get(delete));
213     }
214
215     private void assertHighestSequenceNr(final long expected) {
216         AsyncMessage<Long> highest = SegmentedJournalActor.readHighestSequenceNr(0);
217         actor.tell(highest, ActorRef.noSender());
218         assertEquals(expected, (long) get(highest));
219     }
220
221     private void assertReplayCount(final int expected) {
222         Consumer<PersistentRepr> firstCallback = mock(Consumer.class);
223         doNothing().when(firstCallback).accept(any(PersistentRepr.class));
224         AsyncMessage<Void> replay = SegmentedJournalActor.replayMessages(0, Long.MAX_VALUE, Long.MAX_VALUE,
225             firstCallback);
226         actor.tell(replay, ActorRef.noSender());
227         assertNull(get(replay));
228         verify(firstCallback, times(expected)).accept(any(PersistentRepr.class));
229     }
230
231     private static void assertFileCount(final long dataFiles, final long deleteFiles) throws IOException {
232         List<File> contents = Files.list(DIRECTORY.toPath()).map(Path::toFile).collect(Collectors.toList());
233         assertEquals(dataFiles, contents.stream().filter(file -> file.getName().startsWith("data-")).count());
234         assertEquals(deleteFiles, contents.stream().filter(file -> file.getName().startsWith("delete-")).count());
235     }
236
237     private static <T> T get(final AsyncMessage<T> message) {
238         return getFuture(message.promise.future());
239     }
240
241     private static <T> T getFuture(final Future<T> future) {
242         assertTrue(future.isCompleted());
243         return future.value().get().get();
244     }
245
246     private static final class LargePayload implements Serializable {
247         private static final long serialVersionUID = 1L;
248
249         final byte[] bytes = new byte[MESSAGE_SIZE / 2];
250
251     }
252 }