4d3db7980e2116ef013f62b13ed120ddb43c33f9
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / test / java / org / opendaylight / controller / akka / segjournal / SegmentedFileJournalTest.java
1 /*
2  * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.akka.segjournal;
9
10 import static org.junit.jupiter.api.Assertions.assertEquals;
11 import static org.junit.jupiter.api.Assertions.assertFalse;
12 import static org.junit.jupiter.api.Assertions.assertNull;
13 import static org.junit.jupiter.api.Assertions.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.times;
17 import static org.mockito.Mockito.verify;
18
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.PoisonPill;
22 import akka.persistence.AtomicWrite;
23 import akka.persistence.PersistentRepr;
24 import akka.testkit.CallingThreadDispatcher;
25 import akka.testkit.javadsl.TestKit;
26 import io.atomix.storage.journal.StorageLevel;
27 import java.io.File;
28 import java.io.IOException;
29 import java.io.Serializable;
30 import java.nio.file.Files;
31 import java.nio.file.Path;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.function.Consumer;
36 import java.util.stream.Collectors;
37 import org.apache.commons.io.FileUtils;
38 import org.junit.jupiter.api.AfterAll;
39 import org.junit.jupiter.api.AfterEach;
40 import org.junit.jupiter.api.BeforeAll;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.junit.jupiter.api.extension.ExtendWith;
44 import org.mockito.Mock;
45 import org.mockito.junit.jupiter.MockitoExtension;
46 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
47 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
48 import scala.concurrent.Future;
49
50 @ExtendWith(MockitoExtension.class)
51 class SegmentedFileJournalTest {
52     private static final File DIRECTORY = new File("target/sfj-test");
53     private static final int SEGMENT_SIZE = 1024 * 1024;
54     private static final int MESSAGE_SIZE = 512 * 1024;
55     private static final int FLUSH_SIZE = 16 * 1024;
56
57     private static ActorSystem SYSTEM;
58
59     @Mock
60     private Consumer<PersistentRepr> firstCallback;
61
62     private TestKit kit;
63     private ActorRef actor;
64
65     @BeforeAll
66     static void beforeClass() {
67         SYSTEM = ActorSystem.create("test");
68     }
69
70     @AfterAll
71     static void afterClass() {
72         TestKit.shutdownActorSystem(SYSTEM);
73         SYSTEM = null;
74     }
75
76     @BeforeEach
77     void before() {
78         kit = new TestKit(SYSTEM);
79         FileUtils.deleteQuietly(DIRECTORY);
80         actor = actor();
81     }
82
83     @AfterEach
84     void after() {
85         actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
86         FileUtils.deleteQuietly(DIRECTORY);
87     }
88
89     @Test
90     void testDeleteAfterStop() {
91         // Preliminary setup
92         final WriteMessages write = new WriteMessages();
93         final Future<Optional<Exception>> first = write.add(AtomicWrite.apply(PersistentRepr.apply("first", 1, "foo",
94             null, false, kit.getRef(), "uuid")));
95         final Future<Optional<Exception>> second = write.add(AtomicWrite.apply(PersistentRepr.apply("second", 2, "foo",
96             null, false, kit.getRef(), "uuid")));
97         actor.tell(write, ActorRef.noSender());
98         assertFalse(getFuture(first).isPresent());
99         assertFalse(getFuture(second).isPresent());
100
101         assertHighestSequenceNr(2);
102         assertReplayCount(2);
103
104         deleteEntries(1);
105
106         assertHighestSequenceNr(2);
107         assertReplayCount(1);
108
109         // Restart actor
110         actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
111         actor = actor();
112
113         // Check if state is retained
114         assertHighestSequenceNr(2);
115         assertReplayCount(1);
116     }
117
118     @Test
119     void testSegmentation() throws IOException {
120         // We want to have roughly three segments
121         final LargePayload payload = new LargePayload();
122
123         final WriteMessages write = new WriteMessages();
124         final List<Future<Optional<Exception>>> requests = new ArrayList<>();
125
126         // Each payload is half of segment size, plus some overhead, should result in two segments being present
127         for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
128             requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
129                 "uuid"))));
130         }
131
132         actor.tell(write, ActorRef.noSender());
133         requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
134
135         assertFileCount(2, 1);
136
137         // Delete all but the last entry
138         deleteEntries(requests.size());
139
140         assertFileCount(1, 1);
141     }
142
143     @Test
144     void testComplexDeletesAndPartialReplays() throws Exception {
145         for (int i = 0; i <= 4; i++) {
146             writeBigPaylod();
147         }
148
149         assertFileCount(10, 1);
150
151         // delete including index 3, so get rid of the first segment
152         deleteEntries(3);
153         assertFileCount(9, 1);
154
155         // get rid of segments 2(index 4-6) and 3(index 7-9)
156         deleteEntries(9);
157         assertFileCount(7, 1);
158
159         // get rid of all segments except the last one
160         deleteEntries(27);
161         assertFileCount(1, 1);
162
163         restartActor();
164
165         // Check if state is retained
166         assertHighestSequenceNr(30);
167         // 28,29,30 replayed
168         assertReplayCount(3);
169
170
171         deleteEntries(28);
172         restartActor();
173
174         assertHighestSequenceNr(30);
175         // 29,30 replayed
176         assertReplayCount(2);
177
178         deleteEntries(29);
179         restartActor();
180
181         // 30 replayed
182         assertReplayCount(1);
183
184         deleteEntries(30);
185         restartActor();
186
187         // nothing replayed
188         assertReplayCount(0);
189     }
190
191     private void restartActor() {
192         actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
193         actor = actor();
194     }
195
196     private void writeBigPaylod() {
197         final LargePayload payload = new LargePayload();
198
199         final WriteMessages write = new WriteMessages();
200         final List<Future<Optional<Exception>>> requests = new ArrayList<>();
201
202         // Each payload is half of segment size, plus some overhead, should result in two segments being present
203         for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
204             requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
205                     "uuid"))));
206         }
207
208         actor.tell(write, ActorRef.noSender());
209         requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
210     }
211
212     private ActorRef actor() {
213         return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
214             SEGMENT_SIZE, FLUSH_SIZE).withDispatcher(CallingThreadDispatcher.Id()));
215     }
216
217     private void deleteEntries(final long deleteTo) {
218         final AsyncMessage<Void> delete = SegmentedJournalActor.deleteMessagesTo(deleteTo);
219         actor.tell(delete, ActorRef.noSender());
220         assertNull(get(delete));
221     }
222
223     private void assertHighestSequenceNr(final long expected) {
224         AsyncMessage<Long> highest = SegmentedJournalActor.readHighestSequenceNr(0);
225         actor.tell(highest, ActorRef.noSender());
226         assertEquals(expected, (long) get(highest));
227     }
228
229     private void assertReplayCount(final int expected) {
230         // Cast fixes an Eclipse warning 'generic array created'
231         reset((Object) firstCallback);
232         AsyncMessage<Void> replay = SegmentedJournalActor.replayMessages(0, Long.MAX_VALUE, Long.MAX_VALUE,
233             firstCallback);
234         actor.tell(replay, ActorRef.noSender());
235         assertNull(get(replay));
236         verify(firstCallback, times(expected)).accept(any(PersistentRepr.class));
237     }
238
239     private static void assertFileCount(final long dataFiles, final long deleteFiles) throws IOException {
240         List<File> contents = Files.list(DIRECTORY.toPath()).map(Path::toFile).collect(Collectors.toList());
241         assertEquals(dataFiles, contents.stream().filter(file -> file.getName().startsWith("data-")).count());
242         assertEquals(deleteFiles, contents.stream().filter(file -> file.getName().startsWith("delete-")).count());
243     }
244
245     private static <T> T get(final AsyncMessage<T> message) {
246         return getFuture(message.promise.future());
247     }
248
249     private static <T> T getFuture(final Future<T> future) {
250         assertTrue(future.isCompleted());
251         return future.value().get().get();
252     }
253
254     static final class LargePayload implements Serializable {
255         @java.io.Serial
256         private static final long serialVersionUID = 1L;
257
258         final byte[] bytes = new byte[MESSAGE_SIZE / 2];
259     }
260 }