54f6e6d5d80765e04d4a383134ce223138c75898
[controller.git] / atomix-storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertThrows;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.IOException;
27 import java.nio.file.FileVisitResult;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.SimpleFileVisitor;
32 import java.nio.file.attribute.BasicFileAttributes;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.NoSuchElementException;
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.Test;
39 import org.junit.runner.RunWith;
40 import org.junit.runners.Parameterized;
41
42 /**
43  * Base journal test.
44  *
45  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
46  */
47 @RunWith(Parameterized.class)
48 public abstract class AbstractJournalTest {
49     private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
50         .register(new TestEntrySerdes(), TestEntry.class)
51         .register(new ByteArraySerdes(), byte[].class)
52         .build();
53
54     protected static final TestEntry ENTRY = new TestEntry(32);
55     private static final Path PATH = Paths.get("target/test-logs/");
56
57     private final StorageLevel storageLevel;
58     private final int maxSegmentSize;
59     protected final int entriesPerSegment;
60
61     protected AbstractJournalTest(final StorageLevel storageLevel, final int maxSegmentSize) {
62         this.storageLevel = storageLevel;
63         this.maxSegmentSize = maxSegmentSize;
64         int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
65         entriesPerSegment = (maxSegmentSize - 64) / entryLength;
66     }
67
68     @Parameterized.Parameters
69     public static List<Object[]> primeNumbers() {
70         var runs = new ArrayList<Object[]>();
71         for (int i = 1; i <= 10; i++) {
72             for (int j = 1; j <= 10; j++) {
73                 runs.add(new Object[] { 64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j });
74             }
75         }
76         return runs;
77     }
78
79     protected SegmentedJournal<TestEntry> createJournal() {
80         return SegmentedJournal.<TestEntry>builder()
81             .withName("test")
82             .withDirectory(PATH.toFile())
83             .withNamespace(NAMESPACE)
84             .withStorageLevel(storageLevel)
85             .withMaxSegmentSize(maxSegmentSize)
86             .withIndexDensity(.2)
87             .build();
88     }
89
90     @Test
91     public void testCloseMultipleTimes() {
92         // given
93         final Journal<TestEntry> journal = createJournal();
94
95         // when
96         journal.close();
97
98         // then
99         journal.close();
100     }
101
102     @Test
103     public void testWriteRead() throws Exception {
104         try (Journal<TestEntry> journal = createJournal()) {
105             JournalWriter<TestEntry> writer = journal.writer();
106             JournalReader<TestEntry> reader = journal.openReader(1);
107
108             // Append a couple entries.
109             Indexed<TestEntry> indexed;
110             assertEquals(1, writer.getNextIndex());
111             indexed = writer.append(ENTRY);
112             assertEquals(1, indexed.index());
113
114             assertEquals(2, writer.getNextIndex());
115             writer.append(ENTRY);
116             reader.reset(2);
117             indexed = reader.next();
118             assertEquals(2, indexed.index());
119             assertFalse(reader.hasNext());
120
121             // Test reading an entry
122             Indexed<TestEntry> entry1;
123             reader.reset();
124             entry1 = reader.next();
125             assertEquals(1, entry1.index());
126             assertEquals(entry1, reader.getCurrentEntry());
127             assertEquals(1, reader.getCurrentIndex());
128
129             // Test reading a second entry
130             Indexed<TestEntry> entry2;
131             assertTrue(reader.hasNext());
132             assertEquals(2, reader.getNextIndex());
133             entry2 = reader.next();
134             assertEquals(2, entry2.index());
135             assertEquals(entry2, reader.getCurrentEntry());
136             assertEquals(2, reader.getCurrentIndex());
137             assertFalse(reader.hasNext());
138
139             // Test opening a new reader and reading from the journal.
140             reader = journal.openReader(1);
141             assertTrue(reader.hasNext());
142             entry1 = reader.next();
143             assertEquals(1, entry1.index());
144             assertEquals(entry1, reader.getCurrentEntry());
145             assertEquals(1, reader.getCurrentIndex());
146             assertTrue(reader.hasNext());
147
148             assertTrue(reader.hasNext());
149             assertEquals(2, reader.getNextIndex());
150             entry2 = reader.next();
151             assertEquals(2, entry2.index());
152             assertEquals(entry2, reader.getCurrentEntry());
153             assertEquals(2, reader.getCurrentIndex());
154             assertFalse(reader.hasNext());
155
156             // Reset the reader.
157             reader.reset();
158
159             // Test opening a new reader and reading from the journal.
160             reader = journal.openReader(1);
161             assertTrue(reader.hasNext());
162             entry1 = reader.next();
163             assertEquals(1, entry1.index());
164             assertEquals(entry1, reader.getCurrentEntry());
165             assertEquals(1, reader.getCurrentIndex());
166             assertTrue(reader.hasNext());
167
168             assertTrue(reader.hasNext());
169             assertEquals(2, reader.getNextIndex());
170             entry2 = reader.next();
171             assertEquals(2, entry2.index());
172             assertEquals(entry2, reader.getCurrentEntry());
173             assertEquals(2, reader.getCurrentIndex());
174             assertFalse(reader.hasNext());
175
176             // Truncate the journal and write a different entry.
177             writer.truncate(1);
178             assertEquals(2, writer.getNextIndex());
179             writer.append(ENTRY);
180             reader.reset(2);
181             indexed = reader.next();
182             assertEquals(2, indexed.index());
183
184             // Reset the reader to a specific index and read the last entry again.
185             reader.reset(2);
186
187             assertNotNull(reader.getCurrentEntry());
188             assertEquals(1, reader.getCurrentIndex());
189             assertEquals(1, reader.getCurrentEntry().index());
190             assertTrue(reader.hasNext());
191             assertEquals(2, reader.getNextIndex());
192             entry2 = reader.next();
193             assertEquals(2, entry2.index());
194             assertEquals(entry2, reader.getCurrentEntry());
195             assertEquals(2, reader.getCurrentIndex());
196             assertFalse(reader.hasNext());
197         }
198     }
199
200     @Test
201     public void testResetTruncateZero() throws Exception {
202         try (SegmentedJournal<TestEntry> journal = createJournal()) {
203             JournalWriter<TestEntry> writer = journal.writer();
204             JournalReader<TestEntry> reader = journal.openReader(1);
205
206             assertEquals(0, writer.getLastIndex());
207             writer.append(ENTRY);
208             writer.append(ENTRY);
209             writer.reset(1);
210             assertEquals(0, writer.getLastIndex());
211             writer.append(ENTRY);
212             assertEquals(1, reader.next().index());
213             writer.reset(1);
214             assertEquals(0, writer.getLastIndex());
215             writer.append(ENTRY);
216             assertEquals(1, writer.getLastIndex());
217             assertEquals(1, writer.getLastEntry().index());
218
219             assertTrue(reader.hasNext());
220             assertEquals(1, reader.next().index());
221
222             writer.truncate(0);
223             assertEquals(0, writer.getLastIndex());
224             assertNull(writer.getLastEntry());
225             writer.append(ENTRY);
226             assertEquals(1, writer.getLastIndex());
227             assertEquals(1, writer.getLastEntry().index());
228
229             assertTrue(reader.hasNext());
230             assertEquals(1, reader.next().index());
231         }
232     }
233
234     @Test
235     public void testTruncateRead() throws Exception {
236         int i = 10;
237         try (Journal<TestEntry> journal = createJournal()) {
238             JournalWriter<TestEntry> writer = journal.writer();
239             JournalReader<TestEntry> reader = journal.openReader(1);
240
241             for (int j = 1; j <= i; j++) {
242                 assertEquals(j, writer.append(new TestEntry(32)).index());
243             }
244
245             for (int j = 1; j <= i - 2; j++) {
246                 assertTrue(reader.hasNext());
247                 assertEquals(j, reader.next().index());
248             }
249
250             writer.truncate(i - 2);
251
252             assertFalse(reader.hasNext());
253             assertEquals(i - 1, writer.append(new TestEntry(32)).index());
254             assertEquals(i, writer.append(new TestEntry(32)).index());
255
256             assertTrue(reader.hasNext());
257             Indexed<TestEntry> entry = reader.next();
258             assertEquals(i - 1, entry.index());
259             assertTrue(reader.hasNext());
260             entry = reader.next();
261             assertEquals(i, entry.index());
262         }
263     }
264
265     @Test
266     public void testWriteReadEntries() throws Exception {
267         try (Journal<TestEntry> journal = createJournal()) {
268             JournalWriter<TestEntry> writer = journal.writer();
269             JournalReader<TestEntry> reader = journal.openReader(1);
270
271             for (int i = 1; i <= entriesPerSegment * 5; i++) {
272                 writer.append(ENTRY);
273                 assertTrue(reader.hasNext());
274                 Indexed<TestEntry> entry;
275                 entry = reader.next();
276                 assertEquals(i, entry.index());
277                 assertEquals(32, entry.entry().bytes().length);
278                 reader.reset(i);
279                 entry = reader.next();
280                 assertEquals(i, entry.index());
281                 assertEquals(32, entry.entry().bytes().length);
282
283                 if (i > 6) {
284                     reader.reset(i - 5);
285                     assertNotNull(reader.getCurrentEntry());
286                     assertEquals(i - 6, reader.getCurrentIndex());
287                     assertEquals(i - 6, reader.getCurrentEntry().index());
288                     assertEquals(i - 5, reader.getNextIndex());
289                     reader.reset(i + 1);
290                 }
291
292                 writer.truncate(i - 1);
293                 writer.append(ENTRY);
294
295                 assertTrue(reader.hasNext());
296                 reader.reset(i);
297                 assertTrue(reader.hasNext());
298                 entry = reader.next();
299                 assertEquals(i, entry.index());
300                 assertEquals(32, entry.entry().bytes().length);
301             }
302         }
303     }
304
305     @Test
306     public void testWriteReadCommittedEntries() throws Exception {
307         try (Journal<TestEntry> journal = createJournal()) {
308             JournalWriter<TestEntry> writer = journal.writer();
309             JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
310
311             for (int i = 1; i <= entriesPerSegment * 5; i++) {
312                 writer.append(ENTRY);
313                 assertFalse(reader.hasNext());
314                 writer.commit(i);
315                 assertTrue(reader.hasNext());
316                 Indexed<TestEntry> entry;
317                 entry = reader.next();
318                 assertEquals(i, entry.index());
319                 assertEquals(32, entry.entry().bytes().length);
320                 reader.reset(i);
321                 entry = reader.next();
322                 assertEquals(i, entry.index());
323                 assertEquals(32, entry.entry().bytes().length);
324             }
325         }
326     }
327
328     // Same as testWriteReadCommittedEntries(), but does not use hasNext() but checks whether an exception is thrown
329     @Test
330     public void testWriteReadCommittedEntriesException() throws Exception {
331         try (Journal<TestEntry> journal = createJournal()) {
332             JournalWriter<TestEntry> writer = journal.writer();
333             JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
334
335             for (int i = 1; i <= entriesPerSegment * 5; i++) {
336                 writer.append(ENTRY);
337                 assertThrows(NoSuchElementException.class, reader::next);
338                 writer.commit(i);
339                 Indexed<TestEntry> entry = reader.next();
340                 assertEquals(i, entry.index());
341                 assertEquals(32, entry.entry().bytes().length);
342                 reader.reset(i);
343                 entry = reader.next();
344                 assertEquals(i, entry.index());
345                 assertEquals(32, entry.entry().bytes().length);
346             }
347         }
348     }
349
350     @Test
351     public void testReadAfterCompact() throws Exception {
352         try (SegmentedJournal<TestEntry> journal = createJournal()) {
353             JournalWriter<TestEntry> writer = journal.writer();
354             JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
355             JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
356
357             for (int i = 1; i <= entriesPerSegment * 10; i++) {
358                 assertEquals(i, writer.append(ENTRY).index());
359             }
360
361             assertEquals(1, uncommittedReader.getNextIndex());
362             assertTrue(uncommittedReader.hasNext());
363             assertEquals(1, committedReader.getNextIndex());
364             assertFalse(committedReader.hasNext());
365
366             writer.commit(entriesPerSegment * 9);
367
368             assertTrue(uncommittedReader.hasNext());
369             assertTrue(committedReader.hasNext());
370
371             for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
372                 assertEquals(i, uncommittedReader.next().index());
373                 assertEquals(i, committedReader.next().index());
374             }
375
376             journal.compact(entriesPerSegment * 5 + 1);
377
378             assertNull(uncommittedReader.getCurrentEntry());
379             assertEquals(0, uncommittedReader.getCurrentIndex());
380             assertTrue(uncommittedReader.hasNext());
381             assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
382             assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
383
384             assertNull(committedReader.getCurrentEntry());
385             assertEquals(0, committedReader.getCurrentIndex());
386             assertTrue(committedReader.hasNext());
387             assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
388             assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
389         }
390     }
391
392     /**
393      * Tests reading from a compacted journal.
394      */
395     @Test
396     public void testCompactAndRecover() throws Exception {
397         try (var journal = createJournal()) {
398             // Write three segments to the journal.
399             final var writer = journal.writer();
400             for (int i = 0; i < entriesPerSegment * 3; i++) {
401                 writer.append(ENTRY);
402             }
403
404             // Commit the entries and compact the first segment.
405             writer.commit(entriesPerSegment * 3);
406             journal.compact(entriesPerSegment + 1);
407         }
408
409         // Reopen the journal and create a reader.
410         try (var journal = createJournal()) {
411             final var writer = journal.writer();
412             final var reader = journal.openReader(1, JournalReader.Mode.COMMITS);
413             writer.append(ENTRY);
414             writer.append(ENTRY);
415             writer.commit(entriesPerSegment * 3);
416
417             // Ensure the reader starts at the first physical index in the journal.
418             assertEquals(entriesPerSegment + 1, reader.getNextIndex());
419             assertEquals(reader.getFirstIndex(), reader.getNextIndex());
420             assertTrue(reader.hasNext());
421             assertEquals(entriesPerSegment + 1, reader.getNextIndex());
422             assertEquals(reader.getFirstIndex(), reader.getNextIndex());
423             assertEquals(entriesPerSegment + 1, reader.next().index());
424         }
425     }
426
427     @Before
428     @After
429     public void cleanupStorage() throws IOException {
430         if (Files.exists(PATH)) {
431             Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
432                 @Override
433                 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
434                     Files.delete(file);
435                     return FileVisitResult.CONTINUE;
436                 }
437
438                 @Override
439                 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
440                     Files.delete(dir);
441                     return FileVisitResult.CONTINUE;
442                 }
443             });
444         }
445     }
446 }