222d0bfe391307e7b49528be431cf82c37523665
[controller.git] / third-party / atomix / storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.nio.file.FileVisitResult;
27 import java.nio.file.Files;
28 import java.nio.file.Path;
29 import java.nio.file.Paths;
30 import java.nio.file.SimpleFileVisitor;
31 import java.nio.file.attribute.BasicFileAttributes;
32 import java.util.ArrayList;
33 import java.util.List;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.junit.runner.RunWith;
38 import org.junit.runners.Parameterized;
39
40 /**
41  * Base journal test.
42  *
43  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
44  */
45 @RunWith(Parameterized.class)
46 public abstract class AbstractJournalTest {
47   private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
48       .register(new TestEntrySerdes(), TestEntry.class)
49       .register(new ByteArraySerdes(), byte[].class)
50       .build();
51
52   protected static final TestEntry ENTRY = new TestEntry(32);
53   private static final Path PATH = Paths.get("target/test-logs/");
54
55   private final int maxSegmentSize;
56   protected final int entriesPerSegment;
57
58   protected AbstractJournalTest(final int maxSegmentSize) {
59     this.maxSegmentSize = maxSegmentSize;
60     int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
61     entriesPerSegment = (maxSegmentSize - 64) / entryLength;
62   }
63
64   protected abstract StorageLevel storageLevel();
65
66   @Parameterized.Parameters
67   public static List<Object[]> primeNumbers() {
68     List<Object[]> runs = new ArrayList<>();
69     for (int i = 1; i <= 10; i++) {
70       for (int j = 1; j <= 10; j++) {
71         runs.add(new Object[]{64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j});
72       }
73     }
74     return runs;
75   }
76
77   protected SegmentedJournal<TestEntry> createJournal() {
78     return SegmentedJournal.<TestEntry>builder()
79         .withName("test")
80         .withDirectory(PATH.toFile())
81         .withNamespace(NAMESPACE)
82         .withStorageLevel(storageLevel())
83         .withMaxSegmentSize(maxSegmentSize)
84         .withIndexDensity(.2)
85         .build();
86   }
87
88   @Test
89   public void testCloseMultipleTimes() {
90     // given
91     final Journal<TestEntry> journal = createJournal();
92
93     // when
94     journal.close();
95
96     // then
97     journal.close();
98   }
99
100   @Test
101   public void testWriteRead() throws Exception {
102     try (Journal<TestEntry> journal = createJournal()) {
103       JournalWriter<TestEntry> writer = journal.writer();
104       JournalReader<TestEntry> reader = journal.openReader(1);
105
106       // Append a couple entries.
107       Indexed<TestEntry> indexed;
108       assertEquals(1, writer.getNextIndex());
109       indexed = writer.append(ENTRY);
110       assertEquals(1, indexed.index());
111
112       assertEquals(2, writer.getNextIndex());
113       writer.append(new Indexed<>(2, ENTRY, 0));
114       reader.reset(2);
115       indexed = reader.next();
116       assertEquals(2, indexed.index());
117       assertFalse(reader.hasNext());
118
119       // Test reading an entry
120       Indexed<TestEntry> entry1;
121       reader.reset();
122       entry1 = reader.next();
123       assertEquals(1, entry1.index());
124       assertEquals(entry1, reader.getCurrentEntry());
125       assertEquals(1, reader.getCurrentIndex());
126
127       // Test reading a second entry
128       Indexed<TestEntry> entry2;
129       assertTrue(reader.hasNext());
130       assertEquals(2, reader.getNextIndex());
131       entry2 = reader.next();
132       assertEquals(2, entry2.index());
133       assertEquals(entry2, reader.getCurrentEntry());
134       assertEquals(2, reader.getCurrentIndex());
135       assertFalse(reader.hasNext());
136
137       // Test opening a new reader and reading from the journal.
138       reader = journal.openReader(1);
139       assertTrue(reader.hasNext());
140       entry1 = reader.next();
141       assertEquals(1, entry1.index());
142       assertEquals(entry1, reader.getCurrentEntry());
143       assertEquals(1, reader.getCurrentIndex());
144       assertTrue(reader.hasNext());
145
146       assertTrue(reader.hasNext());
147       assertEquals(2, reader.getNextIndex());
148       entry2 = reader.next();
149       assertEquals(2, entry2.index());
150       assertEquals(entry2, reader.getCurrentEntry());
151       assertEquals(2, reader.getCurrentIndex());
152       assertFalse(reader.hasNext());
153
154       // Reset the reader.
155       reader.reset();
156
157       // Test opening a new reader and reading from the journal.
158       reader = journal.openReader(1);
159       assertTrue(reader.hasNext());
160       entry1 = reader.next();
161       assertEquals(1, entry1.index());
162       assertEquals(entry1, reader.getCurrentEntry());
163       assertEquals(1, reader.getCurrentIndex());
164       assertTrue(reader.hasNext());
165
166       assertTrue(reader.hasNext());
167       assertEquals(2, reader.getNextIndex());
168       entry2 = reader.next();
169       assertEquals(2, entry2.index());
170       assertEquals(entry2, reader.getCurrentEntry());
171       assertEquals(2, reader.getCurrentIndex());
172       assertFalse(reader.hasNext());
173
174       // Truncate the journal and write a different entry.
175       writer.truncate(1);
176       assertEquals(2, writer.getNextIndex());
177       writer.append(new Indexed<>(2, ENTRY, 0));
178       reader.reset(2);
179       indexed = reader.next();
180       assertEquals(2, indexed.index());
181
182       // Reset the reader to a specific index and read the last entry again.
183       reader.reset(2);
184
185       assertNotNull(reader.getCurrentEntry());
186       assertEquals(1, reader.getCurrentIndex());
187       assertEquals(1, reader.getCurrentEntry().index());
188       assertTrue(reader.hasNext());
189       assertEquals(2, reader.getNextIndex());
190       entry2 = reader.next();
191       assertEquals(2, entry2.index());
192       assertEquals(entry2, reader.getCurrentEntry());
193       assertEquals(2, reader.getCurrentIndex());
194       assertFalse(reader.hasNext());
195     }
196   }
197
198   @Test
199   public void testResetTruncateZero() throws Exception {
200     try (SegmentedJournal<TestEntry> journal = createJournal()) {
201       JournalWriter<TestEntry> writer = journal.writer();
202       JournalReader<TestEntry> reader = journal.openReader(1);
203
204       assertEquals(0, writer.getLastIndex());
205       writer.append(ENTRY);
206       writer.append(ENTRY);
207       writer.reset(1);
208       assertEquals(0, writer.getLastIndex());
209       writer.append(ENTRY);
210       assertEquals(1, reader.next().index());
211       writer.reset(1);
212       assertEquals(0, writer.getLastIndex());
213       writer.append(ENTRY);
214       assertEquals(1, writer.getLastIndex());
215       assertEquals(1, writer.getLastEntry().index());
216
217       assertTrue(reader.hasNext());
218       assertEquals(1, reader.next().index());
219
220       writer.truncate(0);
221       assertEquals(0, writer.getLastIndex());
222       assertNull(writer.getLastEntry());
223       writer.append(ENTRY);
224       assertEquals(1, writer.getLastIndex());
225       assertEquals(1, writer.getLastEntry().index());
226
227       assertTrue(reader.hasNext());
228       assertEquals(1, reader.next().index());
229     }
230   }
231
232   @Test
233   public void testTruncateRead() throws Exception {
234     int i = 10;
235     try (Journal<TestEntry> journal = createJournal()) {
236       JournalWriter<TestEntry> writer = journal.writer();
237       JournalReader<TestEntry> reader = journal.openReader(1);
238
239       for (int j = 1; j <= i; j++) {
240         assertEquals(j, writer.append(new TestEntry(32)).index());
241       }
242
243       for (int j = 1; j <= i - 2; j++) {
244         assertTrue(reader.hasNext());
245         assertEquals(j, reader.next().index());
246       }
247
248       writer.truncate(i - 2);
249
250       assertFalse(reader.hasNext());
251       assertEquals(i - 1, writer.append(new TestEntry(32)).index());
252       assertEquals(i, writer.append(new TestEntry(32)).index());
253
254       assertTrue(reader.hasNext());
255       Indexed<TestEntry> entry = reader.next();
256       assertEquals(i - 1, entry.index());
257       assertTrue(reader.hasNext());
258       entry = reader.next();
259       assertEquals(i, entry.index());
260     }
261   }
262
263   @Test
264   public void testWriteReadEntries() throws Exception {
265     try (Journal<TestEntry> journal = createJournal()) {
266       JournalWriter<TestEntry> writer = journal.writer();
267       JournalReader<TestEntry> reader = journal.openReader(1);
268
269       for (int i = 1; i <= entriesPerSegment * 5; i++) {
270         writer.append(ENTRY);
271         assertTrue(reader.hasNext());
272         Indexed<TestEntry> entry;
273         entry = reader.next();
274         assertEquals(i, entry.index());
275         assertEquals(32, entry.entry().bytes().length);
276         reader.reset(i);
277         entry = reader.next();
278         assertEquals(i, entry.index());
279         assertEquals(32, entry.entry().bytes().length);
280
281         if (i > 6) {
282           reader.reset(i - 5);
283           assertNotNull(reader.getCurrentEntry());
284           assertEquals(i - 6, reader.getCurrentIndex());
285           assertEquals(i - 6, reader.getCurrentEntry().index());
286           assertEquals(i - 5, reader.getNextIndex());
287           reader.reset(i + 1);
288         }
289
290         writer.truncate(i - 1);
291         writer.append(ENTRY);
292
293         assertTrue(reader.hasNext());
294         reader.reset(i);
295         assertTrue(reader.hasNext());
296         entry = reader.next();
297         assertEquals(i, entry.index());
298         assertEquals(32, entry.entry().bytes().length);
299       }
300     }
301   }
302
303   @Test
304   public void testWriteReadCommittedEntries() throws Exception {
305     try (Journal<TestEntry> journal = createJournal()) {
306       JournalWriter<TestEntry> writer = journal.writer();
307       JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
308
309       for (int i = 1; i <= entriesPerSegment * 5; i++) {
310         writer.append(ENTRY);
311         assertFalse(reader.hasNext());
312         writer.commit(i);
313         assertTrue(reader.hasNext());
314         Indexed<TestEntry> entry;
315         entry = reader.next();
316         assertEquals(i, entry.index());
317         assertEquals(32, entry.entry().bytes().length);
318         reader.reset(i);
319         entry = reader.next();
320         assertEquals(i, entry.index());
321         assertEquals(32, entry.entry().bytes().length);
322       }
323     }
324   }
325
326   @Test
327   public void testReadAfterCompact() throws Exception {
328     try (SegmentedJournal<TestEntry> journal = createJournal()) {
329       JournalWriter<TestEntry> writer = journal.writer();
330       JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
331       JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
332
333       for (int i = 1; i <= entriesPerSegment * 10; i++) {
334         assertEquals(i, writer.append(ENTRY).index());
335       }
336
337       assertEquals(1, uncommittedReader.getNextIndex());
338       assertTrue(uncommittedReader.hasNext());
339       assertEquals(1, committedReader.getNextIndex());
340       assertFalse(committedReader.hasNext());
341
342       writer.commit(entriesPerSegment * 9);
343
344       assertTrue(uncommittedReader.hasNext());
345       assertTrue(committedReader.hasNext());
346
347       for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
348         assertEquals(i, uncommittedReader.next().index());
349         assertEquals(i, committedReader.next().index());
350       }
351
352       journal.compact(entriesPerSegment * 5 + 1);
353
354       assertNull(uncommittedReader.getCurrentEntry());
355       assertEquals(0, uncommittedReader.getCurrentIndex());
356       assertTrue(uncommittedReader.hasNext());
357       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
358       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
359
360       assertNull(committedReader.getCurrentEntry());
361       assertEquals(0, committedReader.getCurrentIndex());
362       assertTrue(committedReader.hasNext());
363       assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
364       assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
365     }
366   }
367
368   @Before
369   @After
370   public void cleanupStorage() throws IOException {
371     if (Files.exists(PATH)) {
372       Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
373         @Override
374         public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
375           Files.delete(file);
376           return FileVisitResult.CONTINUE;
377         }
378
379         @Override
380         public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
381           Files.delete(dir);
382           return FileVisitResult.CONTINUE;
383         }
384       });
385     }
386   }
387 }