Maintain last known position in JournalIndex
[controller.git] / atomix-storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static org.junit.Assert.assertArrayEquals;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23
24 import java.io.IOException;
25 import java.nio.file.FileVisitResult;
26 import java.nio.file.Files;
27 import java.nio.file.Path;
28 import java.nio.file.Paths;
29 import java.nio.file.SimpleFileVisitor;
30 import java.nio.file.attribute.BasicFileAttributes;
31 import java.util.ArrayList;
32 import java.util.List;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.runner.RunWith;
39 import org.junit.runners.Parameterized;
40
41 /**
42  * Base journal test.
43  *
44  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
45  */
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48     @Deprecated(forRemoval = true, since="9.0.3")
49     private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
50         .register(new TestEntrySerdes(), TestEntry.class)
51         .register(new ByteArraySerdes(), byte[].class)
52         .build();
53
54     protected static final TestEntry ENTRY = new TestEntry(32);
55     private static final Path PATH = Paths.get("target/test-logs/");
56
57     private final StorageLevel storageLevel;
58     private final int maxSegmentSize;
59     protected final int entriesPerSegment;
60
61     protected AbstractJournalTest(final StorageLevel storageLevel, final int maxSegmentSize) {
62         this.storageLevel = storageLevel;
63         this.maxSegmentSize = maxSegmentSize;
64         int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
65         entriesPerSegment = (maxSegmentSize - 64) / entryLength;
66     }
67
68     @Parameterized.Parameters
69     public static List<Object[]> primeNumbers() {
70         var runs = new ArrayList<Object[]>();
71         for (int i = 1; i <= 10; i++) {
72             for (int j = 1; j <= 10; j++) {
73                 runs.add(new Object[] { 64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j });
74             }
75         }
76         return runs;
77     }
78
79     protected SegmentedJournal<TestEntry> createJournal() {
80         return SegmentedJournal.<TestEntry>builder()
81             .withName("test")
82             .withDirectory(PATH.toFile())
83             .withNamespace(NAMESPACE)
84             .withStorageLevel(storageLevel)
85             .withMaxSegmentSize(maxSegmentSize)
86             .withIndexDensity(.2)
87             .build();
88     }
89
90     @Test
91     public void testCloseMultipleTimes() {
92         // given
93         final Journal<TestEntry> journal = createJournal();
94
95         // when
96         journal.close();
97
98         // then
99         journal.close();
100     }
101
102     @Test
103     public void testWriteRead() throws Exception {
104         try (Journal<TestEntry> journal = createJournal()) {
105             JournalWriter<TestEntry> writer = journal.writer();
106             JournalReader<TestEntry> reader = journal.openReader(1);
107
108             // Append a couple entries.
109             assertEquals(1, writer.getNextIndex());
110             var indexed = writer.append(ENTRY);
111             assertEquals(1, indexed.index());
112
113             assertEquals(2, writer.getNextIndex());
114             writer.append(ENTRY);
115             reader.reset(2);
116             indexed = assertNext(reader);
117             assertEquals(2, indexed.index());
118             assertNoNext(reader);
119
120             // Test reading an entry
121             reader.reset();
122             var entry1 = assertNext(reader);
123             assertEquals(1, entry1.index());
124
125             // Test reading a second entry
126             assertEquals(2, reader.getNextIndex());
127             var entry2 = assertNext(reader);
128             assertEquals(2, entry2.index());
129             assertEquals(3, reader.getNextIndex());
130             assertNoNext(reader);
131
132             // Test opening a new reader and reading from the journal.
133             reader = journal.openReader(1);
134             entry1 = assertNext(reader);
135             assertEquals(1, entry1.index());
136
137             assertEquals(2, reader.getNextIndex());
138             entry2 = assertNext(reader);
139             assertEquals(2, entry2.index());
140             assertNoNext(reader);
141
142             // Reset the reader.
143             reader.reset();
144
145             // Test opening a new reader and reading from the journal.
146             reader = journal.openReader(1);
147             entry1 = assertNext(reader);
148             assertEquals(1, entry1.index());
149
150             assertEquals(2, reader.getNextIndex());
151             entry2 = assertNext(reader);
152             assertEquals(2, entry2.index());
153             assertNoNext(reader);
154
155             // Truncate the journal and write a different entry.
156             writer.truncate(1);
157             assertEquals(2, writer.getNextIndex());
158             writer.append(ENTRY);
159             reader.reset(2);
160             indexed = assertNext(reader);
161             assertEquals(2, indexed.index());
162
163             // Reset the reader to a specific index and read the last entry again.
164             reader.reset(2);
165
166             assertEquals(2, reader.getNextIndex());
167             entry2 = assertNext(reader);
168             assertEquals(2, entry2.index());
169             assertNoNext(reader);
170         }
171     }
172
173     @Test
174     public void testResetTruncateZero() throws Exception {
175         try (SegmentedJournal<TestEntry> journal = createJournal()) {
176             JournalWriter<TestEntry> writer = journal.writer();
177             JournalReader<TestEntry> reader = journal.openReader(1);
178
179             assertEquals(0, writer.getLastIndex());
180             writer.append(ENTRY);
181             writer.append(ENTRY);
182             writer.reset(1);
183             assertEquals(0, writer.getLastIndex());
184             writer.append(ENTRY);
185
186             var indexed = assertNext(reader);
187             assertEquals(1, indexed.index());
188             writer.reset(1);
189             assertEquals(0, writer.getLastIndex());
190             indexed = writer.append(ENTRY);
191             assertEquals(1, writer.getLastIndex());
192             assertEquals(1, indexed.index());
193
194             indexed = assertNext(reader);
195             assertEquals(1, indexed.index());
196
197             writer.truncate(0);
198             assertEquals(0, writer.getLastIndex());
199             indexed = writer.append(ENTRY);
200             assertEquals(1, writer.getLastIndex());
201             assertEquals(1, indexed.index());
202
203             indexed = assertNext(reader);
204             assertEquals(1, indexed.index());
205         }
206     }
207
208     @Test
209     public void testTruncateRead() throws Exception {
210         int i = 10;
211         try (Journal<TestEntry> journal = createJournal()) {
212             JournalWriter<TestEntry> writer = journal.writer();
213             JournalReader<TestEntry> reader = journal.openReader(1);
214
215             for (int j = 1; j <= i; j++) {
216                 assertEquals(j, writer.append(new TestEntry(32)).index());
217             }
218
219             for (int j = 1; j <= i - 2; j++) {
220                 assertEquals(j, assertNext(reader).index());
221             }
222
223             writer.truncate(i - 2);
224
225             assertNoNext(reader);
226             assertEquals(i - 1, writer.append(new TestEntry(32)).index());
227             assertEquals(i, writer.append(new TestEntry(32)).index());
228
229             var entry = assertNext(reader);
230             assertEquals(i - 1, entry.index());
231             entry = assertNext(reader);
232             assertNotNull(entry);
233             assertEquals(i, entry.index());
234         }
235     }
236
237     @Test
238     public void testWriteReadEntries() throws Exception {
239         try (Journal<TestEntry> journal = createJournal()) {
240             JournalWriter<TestEntry> writer = journal.writer();
241             JournalReader<TestEntry> reader = journal.openReader(1);
242
243             for (int i = 1; i <= entriesPerSegment * 5; i++) {
244                 writer.append(ENTRY);
245                 var entry = assertNext(reader);
246                 assertEquals(i, entry.index());
247                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
248                 reader.reset(i);
249                 entry = assertNext(reader);
250                 assertEquals(i, entry.index());
251                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
252
253                 if (i > 6) {
254                     reader.reset(i - 5);
255                     assertEquals(i - 5, reader.getNextIndex());
256                     assertNext(reader);
257                     reader.reset(i + 1);
258                 }
259
260                 writer.truncate(i - 1);
261                 writer.append(ENTRY);
262
263                 assertNext(reader);
264                 reader.reset(i);
265                 entry = assertNext(reader);
266                 assertEquals(i, entry.index());
267                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
268             }
269         }
270     }
271
272     @Test
273     public void testWriteReadCommittedEntries() throws Exception {
274         try (Journal<TestEntry> journal = createJournal()) {
275             JournalWriter<TestEntry> writer = journal.writer();
276             JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
277
278             for (int i = 1; i <= entriesPerSegment * 5; i++) {
279                 writer.append(ENTRY);
280                 assertNoNext(reader);
281                 writer.commit(i);
282                 var entry = assertNext(reader);
283                 assertEquals(i, entry.index());
284                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
285                 reader.reset(i);
286                 entry = assertNext(reader);
287                 assertEquals(i, entry.index());
288                 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
289             }
290         }
291     }
292
293     @Test
294     public void testReadAfterCompact() throws Exception {
295         try (SegmentedJournal<TestEntry> journal = createJournal()) {
296             JournalWriter<TestEntry> writer = journal.writer();
297             JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
298             JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
299
300             for (int i = 1; i <= entriesPerSegment * 10; i++) {
301                 assertEquals(i, writer.append(ENTRY).index());
302             }
303
304             assertEquals(1, uncommittedReader.getNextIndex());
305             assertEquals(1, committedReader.getNextIndex());
306
307             // This creates asymmetry, as uncommitted reader will move one step ahead...
308             assertNext(uncommittedReader);
309             assertEquals(2, uncommittedReader.getNextIndex());
310             assertNoNext(committedReader);
311             assertEquals(1, committedReader.getNextIndex());
312
313             writer.commit(entriesPerSegment * 9);
314
315             // ... so here we catch up ...
316             assertNext(committedReader);
317             assertEquals(2, committedReader.getNextIndex());
318
319             // ... and continue from the second entry
320             for (int i = 2; i <= entriesPerSegment * 2.5; i++) {
321                 var entry = assertNext(uncommittedReader);
322                 assertEquals(i, entry.index());
323
324                 entry = assertNext(committedReader);
325                 assertEquals(i, entry.index());
326             }
327
328             journal.compact(entriesPerSegment * 5 + 1);
329
330             assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
331             var entry = assertNext(uncommittedReader);
332             assertEquals(entriesPerSegment * 5 + 1, entry.index());
333
334             assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
335             entry = assertNext(committedReader);
336             assertEquals(entriesPerSegment * 5 + 1, entry.index());
337         }
338     }
339
340     /**
341      * Tests reading from a compacted journal.
342      */
343     @Test
344     public void testCompactAndRecover() throws Exception {
345         try (var journal = createJournal()) {
346             // Write three segments to the journal.
347             final var writer = journal.writer();
348             for (int i = 0; i < entriesPerSegment * 3; i++) {
349                 writer.append(ENTRY);
350             }
351
352             // Commit the entries and compact the first segment.
353             writer.commit(entriesPerSegment * 3);
354             journal.compact(entriesPerSegment + 1);
355         }
356
357         // Reopen the journal and create a reader.
358         try (var journal = createJournal()) {
359             final var writer = journal.writer();
360             final var reader = journal.openReader(1, JournalReader.Mode.COMMITS);
361             writer.append(ENTRY);
362             writer.append(ENTRY);
363             writer.commit(entriesPerSegment * 3);
364
365             // Ensure the reader starts at the first physical index in the journal.
366             assertEquals(entriesPerSegment + 1, reader.getNextIndex());
367             assertEquals(reader.getFirstIndex(), reader.getNextIndex());
368             assertEquals(entriesPerSegment + 1, assertNext(reader).index());
369             assertEquals(entriesPerSegment + 2, reader.getNextIndex());
370         }
371     }
372
373     @Before
374     @After
375     public void cleanupStorage() throws IOException {
376         if (Files.exists(PATH)) {
377             Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
378                 @Override
379                 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
380                     Files.delete(file);
381                     return FileVisitResult.CONTINUE;
382                 }
383
384                 @Override
385                 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
386                     Files.delete(dir);
387                     return FileVisitResult.CONTINUE;
388                 }
389             });
390         }
391     }
392
393     private static @NonNull Indexed<TestEntry> assertNext(final JournalReader<TestEntry> reader) {
394         final var ret = tryNext(reader);
395         assertNotNull(ret);
396         return ret;
397     }
398
399     private static void assertNoNext(final JournalReader<TestEntry> reader) {
400         assertNull(tryNext(reader));
401     }
402
403     private static @Nullable Indexed<TestEntry> tryNext(final JournalReader<TestEntry> reader) {
404         return reader.tryNext(Indexed::new);
405     }
406 }