05a758a366c4ce72ad1160ec1e6177f516febfc4
[controller.git] / atomix-storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static org.junit.Assert.assertArrayEquals;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23
24 import java.io.IOException;
25 import java.nio.file.FileVisitResult;
26 import java.nio.file.Files;
27 import java.nio.file.Path;
28 import java.nio.file.Paths;
29 import java.nio.file.SimpleFileVisitor;
30 import java.nio.file.attribute.BasicFileAttributes;
31 import java.util.ArrayList;
32 import java.util.List;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.runner.RunWith;
39 import org.junit.runners.Parameterized;
40
41 /**
42  * Base journal test.
43  *
44  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
45  */
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48     @Deprecated(forRemoval = true, since = "9.0.3")
49     private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
50         .register(new TestEntrySerdes(), TestEntry.class)
51         .register(new ByteArraySerdes(), byte[].class)
52         .build();
53
54     protected static final TestEntry ENTRY = new TestEntry(32);
55     private static final Path PATH = Paths.get("target/test-logs/");
56
57     private final StorageLevel storageLevel;
58     private final int maxSegmentSize;
59     protected final int entriesPerSegment;
60
61     protected AbstractJournalTest(final StorageLevel storageLevel, final int maxSegmentSize) {
62         this.storageLevel = storageLevel;
63         this.maxSegmentSize = maxSegmentSize;
64         int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
65         entriesPerSegment = (maxSegmentSize - 64) / entryLength;
66     }
67
68     @Parameterized.Parameters
69     public static List<Object[]> primeNumbers() {
70         var runs = new ArrayList<Object[]>();
71         for (int i = 1; i <= 10; i++) {
72             for (int j = 1; j <= 10; j++) {
73                 runs.add(new Object[] { 64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j });
74             }
75         }
76         return runs;
77     }
78
79     protected SegmentedJournal<TestEntry> createJournal() {
80         return SegmentedJournal.<TestEntry>builder()
81             .withName("test")
82             .withDirectory(PATH.toFile())
83             .withNamespace(NAMESPACE)
84             .withStorageLevel(storageLevel)
85             .withMaxSegmentSize(maxSegmentSize)
86             .withIndexDensity(.2)
87             .build();
88     }
89
90     @Test
91     public void testCloseMultipleTimes() {
92         // given
93         final Journal<TestEntry> journal = createJournal();
94
95         // when
96         journal.close();
97
98         // then
99         journal.close();
100     }
101
102     @Test
103     public void testWriteRead() throws Exception {
104         try (var journal = createJournal()) {
105             var writer = journal.writer();
106             var reader = journal.openReader(1);
107
108             // Append a couple entries.
109             assertEquals(1, writer.getNextIndex());
110             var indexed = writer.append(ENTRY);
111             assertEquals(1, indexed.index());
112
113             assertEquals(2, writer.getNextIndex());
114             writer.append(ENTRY);
115             reader.reset(2);
116             indexed = assertNext(reader);
117             assertEquals(2, indexed.index());
118             assertNoNext(reader);
119
120             // Test reading an entry
121             reader.reset();
122             var entry1 = assertNext(reader);
123             assertEquals(1, entry1.index());
124
125             // Test reading a second entry
126             assertEquals(2, reader.getNextIndex());
127             var entry2 = assertNext(reader);
128             assertEquals(2, entry2.index());
129             assertEquals(3, reader.getNextIndex());
130             assertNoNext(reader);
131
132             // Test opening a new reader and reading from the journal.
133             reader = journal.openReader(1);
134             entry1 = assertNext(reader);
135             assertEquals(1, entry1.index());
136
137             assertEquals(2, reader.getNextIndex());
138             entry2 = assertNext(reader);
139             assertEquals(2, entry2.index());
140             assertNoNext(reader);
141
142             // Reset the reader.
143             reader.reset();
144
145             // Test opening a new reader and reading from the journal.
146             reader = journal.openReader(1);
147             entry1 = assertNext(reader);
148             assertEquals(1, entry1.index());
149
150             assertEquals(2, reader.getNextIndex());
151             entry2 = assertNext(reader);
152             assertEquals(2, entry2.index());
153             assertNoNext(reader);
154
155             // Truncate the journal and write a different entry.
156             writer.reset(2);
157             assertEquals(2, writer.getNextIndex());
158             writer.append(ENTRY);
159             reader.reset(2);
160             indexed = assertNext(reader);
161             assertEquals(2, indexed.index());
162
163             // Reset the reader to a specific index and read the last entry again.
164             reader.reset(2);
165
166             assertEquals(2, reader.getNextIndex());
167             entry2 = assertNext(reader);
168             assertEquals(2, entry2.index());
169             assertNoNext(reader);
170         }
171     }
172
173     @Test
174     public void testResetTruncateZero() throws Exception {
175         try (SegmentedJournal<TestEntry> journal = createJournal()) {
176             JournalWriter<TestEntry> writer = journal.writer();
177             JournalReader<TestEntry> reader = journal.openReader(1);
178
179             assertEquals(0, journal.lastIndex());
180             assertEquals(1, writer.getNextIndex());
181             writer.append(ENTRY);
182             writer.append(ENTRY);
183
184             writer.reset(1);
185             assertEquals(0, journal.lastIndex());
186             assertEquals(1, writer.getNextIndex());
187             // Repeat to assert this is a no-op
188             writer.reset(1);
189             assertEquals(0, journal.lastIndex());
190             assertEquals(1, writer.getNextIndex());
191
192             writer.append(ENTRY);
193
194             var indexed = assertNext(reader);
195             assertEquals(1, indexed.index());
196             writer.reset(1);
197             assertEquals(0, journal.lastIndex());
198             assertEquals(1, writer.getNextIndex());
199             indexed = writer.append(ENTRY);
200             assertEquals(1, journal.lastIndex());
201             assertEquals(2, writer.getNextIndex());
202             assertEquals(1, indexed.index());
203
204             indexed = assertNext(reader);
205             assertEquals(1, indexed.index());
206
207             writer.reset(1);
208             assertEquals(0, journal.lastIndex());
209             assertEquals(1, writer.getNextIndex());
210             indexed = writer.append(ENTRY);
211             assertEquals(1, journal.lastIndex());
212             assertEquals(2, writer.getNextIndex());
213             assertEquals(1, indexed.index());
214
215             indexed = assertNext(reader);
216             assertEquals(1, indexed.index());
217         }
218     }
219
220     @Test
221     public void testTruncateRead() throws Exception {
222         final int cnt = 10;
223         try (Journal<TestEntry> journal = createJournal()) {
224             JournalWriter<TestEntry> writer = journal.writer();
225             JournalReader<TestEntry> reader = journal.openReader(1);
226
227             for (int i = 1; i <= cnt; i++) {
228                 assertEquals(i, writer.append(new TestEntry(32)).index());
229             }
230
231             for (int i = 1; i <= cnt - 2; i++) {
232                 assertEquals(i, assertNext(reader).index());
233             }
234
235             writer.reset(cnt - 1);
236
237             assertNoNext(reader);
238             assertEquals(cnt - 1, writer.append(new TestEntry(32)).index());
239             assertEquals(cnt, writer.append(new TestEntry(32)).index());
240
241             var entry = assertNext(reader);
242             assertEquals(cnt - 1, entry.index());
243             entry = assertNext(reader);
244             assertNotNull(entry);
245             assertEquals(cnt, entry.index());
246         }
247     }
248
249     @Test
250     public void testWriteReadEntries() throws Exception {
251         try (Journal<TestEntry> journal = createJournal()) {
252             JournalWriter<TestEntry> writer = journal.writer();
253             JournalReader<TestEntry> reader = journal.openReader(1);
254
255             for (int i = 1; i <= entriesPerSegment * 5; i++) {
256                 writer.append(ENTRY);
257                 var entry = assertNext(reader);
258                 assertEquals(i, entry.index());
259                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
260                 reader.reset(i);
261                 entry = assertNext(reader);
262                 assertEquals(i, entry.index());
263                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
264
265                 if (i > 6) {
266                     reader.reset(i - 5);
267                     assertEquals(i - 5, reader.getNextIndex());
268                     assertNext(reader);
269                     reader.reset(i + 1);
270                 }
271
272                 writer.reset(i);
273                 writer.append(ENTRY);
274
275                 assertNext(reader);
276                 reader.reset(i);
277                 entry = assertNext(reader);
278                 assertEquals(i, entry.index());
279                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
280             }
281         }
282     }
283
284     @Test
285     public void testWriteReadCommittedEntries() throws Exception {
286         try (Journal<TestEntry> journal = createJournal()) {
287             JournalWriter<TestEntry> writer = journal.writer();
288             JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
289
290             for (int i = 1; i <= entriesPerSegment * 5; i++) {
291                 writer.append(ENTRY);
292                 assertNoNext(reader);
293                 writer.commit(i);
294                 var entry = assertNext(reader);
295                 assertEquals(i, entry.index());
296                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
297                 reader.reset(i);
298                 entry = assertNext(reader);
299                 assertEquals(i, entry.index());
300                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
301             }
302         }
303     }
304
305     @Test
306     public void testReadAfterCompact() throws Exception {
307         try (SegmentedJournal<TestEntry> journal = createJournal()) {
308             JournalWriter<TestEntry> writer = journal.writer();
309             JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
310             JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
311
312             for (int i = 1; i <= entriesPerSegment * 10; i++) {
313                 assertEquals(i, writer.append(ENTRY).index());
314             }
315
316             assertEquals(1, uncommittedReader.getNextIndex());
317             assertEquals(1, committedReader.getNextIndex());
318
319             // This creates asymmetry, as uncommitted reader will move one step ahead...
320             assertNext(uncommittedReader);
321             assertEquals(2, uncommittedReader.getNextIndex());
322             assertNoNext(committedReader);
323             assertEquals(1, committedReader.getNextIndex());
324
325             writer.commit(entriesPerSegment * 9);
326
327             // ... so here we catch up ...
328             assertNext(committedReader);
329             assertEquals(2, committedReader.getNextIndex());
330
331             // ... and continue from the second entry
332             for (int i = 2; i <= entriesPerSegment * 2.5; i++) {
333                 var entry = assertNext(uncommittedReader);
334                 assertEquals(i, entry.index());
335
336                 entry = assertNext(committedReader);
337                 assertEquals(i, entry.index());
338             }
339
340             journal.compact(entriesPerSegment * 5 + 1);
341
342             assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
343             var entry = assertNext(uncommittedReader);
344             assertEquals(entriesPerSegment * 5 + 1, entry.index());
345
346             assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
347             entry = assertNext(committedReader);
348             assertEquals(entriesPerSegment * 5 + 1, entry.index());
349         }
350     }
351
352     /**
353      * Tests reading from a compacted journal.
354      */
355     @Test
356     public void testCompactAndRecover() throws Exception {
357         try (var journal = createJournal()) {
358             // Write three segments to the journal.
359             final var writer = journal.writer();
360             for (int i = 0; i < entriesPerSegment * 3; i++) {
361                 writer.append(ENTRY);
362             }
363
364             // Commit the entries and compact the first segment.
365             writer.commit(entriesPerSegment * 3);
366             journal.compact(entriesPerSegment + 1);
367         }
368
369         // Reopen the journal and create a reader.
370         try (var journal = createJournal()) {
371             final var writer = journal.writer();
372             final var reader = journal.openReader(1, JournalReader.Mode.COMMITS);
373             writer.append(ENTRY);
374             writer.append(ENTRY);
375             writer.commit(entriesPerSegment * 3);
376
377             // Ensure the reader starts at the first physical index in the journal.
378             assertEquals(entriesPerSegment + 1, reader.getNextIndex());
379             assertEquals(reader.getFirstIndex(), reader.getNextIndex());
380             assertEquals(entriesPerSegment + 1, assertNext(reader).index());
381             assertEquals(entriesPerSegment + 2, reader.getNextIndex());
382         }
383     }
384
385     @Before
386     @After
387     public void cleanupStorage() throws IOException {
388         if (Files.exists(PATH)) {
389             Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
390                 @Override
391                 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
392                     Files.delete(file);
393                     return FileVisitResult.CONTINUE;
394                 }
395
396                 @Override
397                 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
398                     Files.delete(dir);
399                     return FileVisitResult.CONTINUE;
400                 }
401             });
402         }
403     }
404
405     private static @NonNull Indexed<TestEntry> assertNext(final JournalReader<TestEntry> reader) {
406         final var ret = tryNext(reader);
407         assertNotNull(ret);
408         return ret;
409     }
410
411     private static void assertNoNext(final JournalReader<TestEntry> reader) {
412         assertNull(tryNext(reader));
413     }
414
415     private static @Nullable Indexed<TestEntry> tryNext(final JournalReader<TestEntry> reader) {
416         return reader.tryNext(Indexed::new);
417     }
418 }