Enable checkstyle
[controller.git] / atomix-storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static org.junit.Assert.assertArrayEquals;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23
24 import java.io.IOException;
25 import java.nio.file.FileVisitResult;
26 import java.nio.file.Files;
27 import java.nio.file.Path;
28 import java.nio.file.Paths;
29 import java.nio.file.SimpleFileVisitor;
30 import java.nio.file.attribute.BasicFileAttributes;
31 import java.util.ArrayList;
32 import java.util.List;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.runner.RunWith;
39 import org.junit.runners.Parameterized;
40
41 /**
42  * Base journal test.
43  *
44  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
45  */
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48     @Deprecated(forRemoval = true, since = "9.0.3")
49     private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
50         .register(new TestEntrySerdes(), TestEntry.class)
51         .register(new ByteArraySerdes(), byte[].class)
52         .build();
53
54     protected static final TestEntry ENTRY = new TestEntry(32);
55     private static final Path PATH = Paths.get("target/test-logs/");
56
57     private final StorageLevel storageLevel;
58     private final int maxSegmentSize;
59     protected final int entriesPerSegment;
60
61     protected AbstractJournalTest(final StorageLevel storageLevel, final int maxSegmentSize) {
62         this.storageLevel = storageLevel;
63         this.maxSegmentSize = maxSegmentSize;
64         int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
65         entriesPerSegment = (maxSegmentSize - 64) / entryLength;
66     }
67
68     @Parameterized.Parameters
69     public static List<Object[]> primeNumbers() {
70         var runs = new ArrayList<Object[]>();
71         for (int i = 1; i <= 10; i++) {
72             for (int j = 1; j <= 10; j++) {
73                 runs.add(new Object[] { 64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j });
74             }
75         }
76         return runs;
77     }
78
79     private SegmentedJournal<TestEntry> createJournal() {
80         return new SegmentedJournal<>(SegmentedByteBufJournal.builder()
81             .withName("test")
82             .withDirectory(PATH.toFile())
83             .withStorageLevel(storageLevel)
84             .withMaxSegmentSize(maxSegmentSize)
85             .withIndexDensity(.2)
86             .build(), NAMESPACE.toMapper());
87     }
88
89     @Test
90     public void testCloseMultipleTimes() {
91         // given
92         final Journal<TestEntry> journal = createJournal();
93
94         // when
95         journal.close();
96
97         // then
98         journal.close();
99     }
100
101     @Test
102     public void testWriteRead() throws Exception {
103         try (var journal = createJournal()) {
104             final var writer = journal.writer();
105             final var reader = journal.openReader(1);
106
107             // Append a couple entries.
108             assertEquals(1, writer.getNextIndex());
109             var indexed = writer.append(ENTRY);
110             assertEquals(1, indexed.index());
111
112             assertEquals(2, writer.getNextIndex());
113             writer.append(ENTRY);
114             reader.reset(2);
115             indexed = assertNext(reader);
116             assertEquals(2, indexed.index());
117             assertNoNext(reader);
118
119             // Test reading an entry
120             reader.reset();
121             var entry1 = assertNext(reader);
122             assertEquals(1, entry1.index());
123
124             // Test reading a second entry
125             assertEquals(2, reader.getNextIndex());
126             var entry2 = assertNext(reader);
127             assertEquals(2, entry2.index());
128             assertEquals(3, reader.getNextIndex());
129             assertNoNext(reader);
130
131             // Test opening a new reader and reading from the journal.
132             final var reader2 = journal.openReader(1);
133             entry1 = assertNext(reader2);
134             assertEquals(1, entry1.index());
135
136             assertEquals(2, reader2.getNextIndex());
137             entry2 = assertNext(reader2);
138             assertEquals(2, entry2.index());
139             assertNoNext(reader2);
140
141             // Reset the reader.
142             reader2.reset();
143
144             // Test opening a new reader and reading from the journal.
145             final var reader3 = journal.openReader(1);
146             entry1 = assertNext(reader3);
147             assertEquals(1, entry1.index());
148
149             assertEquals(2, reader3.getNextIndex());
150             entry2 = assertNext(reader3);
151             assertEquals(2, entry2.index());
152             assertNoNext(reader3);
153
154             // Truncate the journal and write a different entry.
155             writer.reset(2);
156             assertEquals(2, writer.getNextIndex());
157             writer.append(ENTRY);
158             reader3.reset(2);
159             indexed = assertNext(reader3);
160             assertEquals(2, indexed.index());
161
162             // Reset the reader to a specific index and read the last entry again.
163             reader3.reset(2);
164
165             assertEquals(2, reader3.getNextIndex());
166             entry2 = assertNext(reader3);
167             assertEquals(2, entry2.index());
168             assertNoNext(reader3);
169         }
170     }
171
172     @Test
173     public void testResetTruncateZero() throws Exception {
174         try (var journal = createJournal()) {
175             final var writer = journal.writer();
176             final var reader = journal.openReader(1);
177
178             assertEquals(0, journal.lastIndex());
179             assertEquals(1, writer.getNextIndex());
180             writer.append(ENTRY);
181             writer.append(ENTRY);
182
183             writer.reset(1);
184             assertEquals(0, journal.lastIndex());
185             assertEquals(1, writer.getNextIndex());
186             // Repeat to assert this is a no-op
187             writer.reset(1);
188             assertEquals(0, journal.lastIndex());
189             assertEquals(1, writer.getNextIndex());
190
191             writer.append(ENTRY);
192
193             var indexed = assertNext(reader);
194             assertEquals(1, indexed.index());
195             writer.reset(1);
196             assertEquals(0, journal.lastIndex());
197             assertEquals(1, writer.getNextIndex());
198             indexed = writer.append(ENTRY);
199             assertEquals(1, journal.lastIndex());
200             assertEquals(2, writer.getNextIndex());
201             assertEquals(1, indexed.index());
202
203             indexed = assertNext(reader);
204             assertEquals(1, indexed.index());
205
206             writer.reset(1);
207             assertEquals(0, journal.lastIndex());
208             assertEquals(1, writer.getNextIndex());
209             indexed = writer.append(ENTRY);
210             assertEquals(1, journal.lastIndex());
211             assertEquals(2, writer.getNextIndex());
212             assertEquals(1, indexed.index());
213
214             indexed = assertNext(reader);
215             assertEquals(1, indexed.index());
216         }
217     }
218
219     @Test
220     public void testTruncateRead() throws Exception {
221         final int cnt = 10;
222         try (Journal<TestEntry> journal = createJournal()) {
223             JournalWriter<TestEntry> writer = journal.writer();
224             JournalReader<TestEntry> reader = journal.openReader(1);
225
226             for (int i = 1; i <= cnt; i++) {
227                 assertEquals(i, writer.append(new TestEntry(32)).index());
228             }
229
230             for (int i = 1; i <= cnt - 2; i++) {
231                 assertEquals(i, assertNext(reader).index());
232             }
233
234             writer.reset(cnt - 1);
235
236             assertNoNext(reader);
237             assertEquals(cnt - 1, writer.append(new TestEntry(32)).index());
238             assertEquals(cnt, writer.append(new TestEntry(32)).index());
239
240             var entry = assertNext(reader);
241             assertEquals(cnt - 1, entry.index());
242             entry = assertNext(reader);
243             assertNotNull(entry);
244             assertEquals(cnt, entry.index());
245         }
246     }
247
248     @Test
249     public void testWriteReadEntries() throws Exception {
250         try (Journal<TestEntry> journal = createJournal()) {
251             JournalWriter<TestEntry> writer = journal.writer();
252             JournalReader<TestEntry> reader = journal.openReader(1);
253
254             for (int i = 1; i <= entriesPerSegment * 5; i++) {
255                 writer.append(ENTRY);
256                 var entry = assertNext(reader);
257                 assertEquals(i, entry.index());
258                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
259                 reader.reset(i);
260                 entry = assertNext(reader);
261                 assertEquals(i, entry.index());
262                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
263
264                 if (i > 6) {
265                     reader.reset(i - 5);
266                     assertEquals(i - 5, reader.getNextIndex());
267                     assertNext(reader);
268                     reader.reset(i + 1);
269                 }
270
271                 writer.reset(i);
272                 writer.append(ENTRY);
273
274                 assertNext(reader);
275                 reader.reset(i);
276                 entry = assertNext(reader);
277                 assertEquals(i, entry.index());
278                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
279             }
280         }
281     }
282
283     @Test
284     public void testWriteReadCommittedEntries() throws Exception {
285         try (Journal<TestEntry> journal = createJournal()) {
286             JournalWriter<TestEntry> writer = journal.writer();
287             JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
288
289             for (int i = 1; i <= entriesPerSegment * 5; i++) {
290                 writer.append(ENTRY);
291                 assertNoNext(reader);
292                 writer.commit(i);
293                 var entry = assertNext(reader);
294                 assertEquals(i, entry.index());
295                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
296                 reader.reset(i);
297                 entry = assertNext(reader);
298                 assertEquals(i, entry.index());
299                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
300             }
301         }
302     }
303
304     @Test
305     public void testReadAfterCompact() throws Exception {
306         try (SegmentedJournal<TestEntry> journal = createJournal()) {
307             JournalWriter<TestEntry> writer = journal.writer();
308             JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
309             JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
310
311             for (int i = 1; i <= entriesPerSegment * 10; i++) {
312                 assertEquals(i, writer.append(ENTRY).index());
313             }
314
315             assertEquals(1, uncommittedReader.getNextIndex());
316             assertEquals(1, committedReader.getNextIndex());
317
318             // This creates asymmetry, as uncommitted reader will move one step ahead...
319             assertNext(uncommittedReader);
320             assertEquals(2, uncommittedReader.getNextIndex());
321             assertNoNext(committedReader);
322             assertEquals(1, committedReader.getNextIndex());
323
324             writer.commit(entriesPerSegment * 9);
325
326             // ... so here we catch up ...
327             assertNext(committedReader);
328             assertEquals(2, committedReader.getNextIndex());
329
330             // ... and continue from the second entry
331             for (int i = 2; i <= entriesPerSegment * 2.5; i++) {
332                 var entry = assertNext(uncommittedReader);
333                 assertEquals(i, entry.index());
334
335                 entry = assertNext(committedReader);
336                 assertEquals(i, entry.index());
337             }
338
339             journal.compact(entriesPerSegment * 5 + 1);
340
341             assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
342             var entry = assertNext(uncommittedReader);
343             assertEquals(entriesPerSegment * 5 + 1, entry.index());
344
345             assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
346             entry = assertNext(committedReader);
347             assertEquals(entriesPerSegment * 5 + 1, entry.index());
348         }
349     }
350
351     /**
352      * Tests reading from a compacted journal.
353      */
354     @Test
355     public void testCompactAndRecover() throws Exception {
356         try (var journal = createJournal()) {
357             // Write three segments to the journal.
358             final var writer = journal.writer();
359             for (int i = 0; i < entriesPerSegment * 3; i++) {
360                 writer.append(ENTRY);
361             }
362
363             // Commit the entries and compact the first segment.
364             writer.commit(entriesPerSegment * 3);
365             journal.compact(entriesPerSegment + 1);
366         }
367
368         // Reopen the journal and create a reader.
369         try (var journal = createJournal()) {
370             final var writer = journal.writer();
371             final var reader = journal.openReader(1, JournalReader.Mode.COMMITS);
372             writer.append(ENTRY);
373             writer.append(ENTRY);
374             writer.commit(entriesPerSegment * 3);
375
376             // Ensure the reader starts at the first physical index in the journal.
377             assertEquals(entriesPerSegment + 1, reader.getNextIndex());
378             assertEquals(reader.getFirstIndex(), reader.getNextIndex());
379             assertEquals(entriesPerSegment + 1, assertNext(reader).index());
380             assertEquals(entriesPerSegment + 2, reader.getNextIndex());
381         }
382     }
383
384     @Before
385     @After
386     public void cleanupStorage() throws IOException {
387         if (Files.exists(PATH)) {
388             Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
389                 @Override
390                 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
391                     Files.delete(file);
392                     return FileVisitResult.CONTINUE;
393                 }
394
395                 @Override
396                 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
397                     Files.delete(dir);
398                     return FileVisitResult.CONTINUE;
399                 }
400             });
401         }
402     }
403
404     private static @NonNull Indexed<TestEntry> assertNext(final JournalReader<TestEntry> reader) {
405         final var ret = tryNext(reader);
406         assertNotNull(ret);
407         return ret;
408     }
409
410     private static void assertNoNext(final JournalReader<TestEntry> reader) {
411         assertNull(tryNext(reader));
412     }
413
414     private static @Nullable Indexed<TestEntry> tryNext(final JournalReader<TestEntry> reader) {
415         return reader.tryNext(Indexed::new);
416     }
417 }