07fcd76a6bc060e86f57cb41ad9930d79f3e6c80
[controller.git] / third-party / atomix / storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
24
25 import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer;
26 import java.io.IOException;
27 import java.nio.file.FileVisitResult;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.SimpleFileVisitor;
32 import java.nio.file.attribute.BasicFileAttributes;
33 import java.util.ArrayList;
34 import java.util.List;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.runner.RunWith;
39 import org.junit.runners.Parameterized;
40
41 /**
42  * Base journal test.
43  *
44  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
45  */
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48   private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
49       .register(new TestEntrySerializer(), TestEntry.class)
50       .register(new ByteArraySerializer(), byte[].class)
51       .build();
52
53   protected static final TestEntry ENTRY = new TestEntry(32);
54   private static final Path PATH = Paths.get("target/test-logs/");
55
56   private final int maxSegmentSize;
57   protected final int entriesPerSegment;
58
59   protected AbstractJournalTest(final int maxSegmentSize) {
60     this.maxSegmentSize = maxSegmentSize;
61     int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
62     entriesPerSegment = (maxSegmentSize - 64) / entryLength;
63   }
64
65   protected abstract StorageLevel storageLevel();
66
67   @Parameterized.Parameters
68   public static List<Object[]> primeNumbers() {
69     List<Object[]> runs = new ArrayList<>();
70     for (int i = 1; i <= 10; i++) {
71       for (int j = 1; j <= 10; j++) {
72         runs.add(new Object[]{64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j});
73       }
74     }
75     return runs;
76   }
77
78   protected SegmentedJournal<TestEntry> createJournal() {
79     return SegmentedJournal.<TestEntry>builder()
80         .withName("test")
81         .withDirectory(PATH.toFile())
82         .withNamespace(NAMESPACE)
83         .withStorageLevel(storageLevel())
84         .withMaxSegmentSize(maxSegmentSize)
85         .withIndexDensity(.2)
86         .build();
87   }
88
89   @Test
90   public void testCloseMultipleTimes() {
91     // given
92     final Journal<TestEntry> journal = createJournal();
93
94     // when
95     journal.close();
96
97     // then
98     journal.close();
99   }
100
101   @Test
102   public void testWriteRead() throws Exception {
103     try (Journal<TestEntry> journal = createJournal()) {
104       JournalWriter<TestEntry> writer = journal.writer();
105       JournalReader<TestEntry> reader = journal.openReader(1);
106
107       // Append a couple entries.
108       Indexed<TestEntry> indexed;
109       assertEquals(1, writer.getNextIndex());
110       indexed = writer.append(ENTRY);
111       assertEquals(1, indexed.index());
112
113       assertEquals(2, writer.getNextIndex());
114       writer.append(new Indexed<>(2, ENTRY, 0));
115       reader.reset(2);
116       indexed = reader.next();
117       assertEquals(2, indexed.index());
118       assertFalse(reader.hasNext());
119
120       // Test reading an entry
121       Indexed<TestEntry> entry1;
122       reader.reset();
123       entry1 = reader.next();
124       assertEquals(1, entry1.index());
125       assertEquals(entry1, reader.getCurrentEntry());
126       assertEquals(1, reader.getCurrentIndex());
127
128       // Test reading a second entry
129       Indexed<TestEntry> entry2;
130       assertTrue(reader.hasNext());
131       assertEquals(2, reader.getNextIndex());
132       entry2 = reader.next();
133       assertEquals(2, entry2.index());
134       assertEquals(entry2, reader.getCurrentEntry());
135       assertEquals(2, reader.getCurrentIndex());
136       assertFalse(reader.hasNext());
137
138       // Test opening a new reader and reading from the journal.
139       reader = journal.openReader(1);
140       assertTrue(reader.hasNext());
141       entry1 = reader.next();
142       assertEquals(1, entry1.index());
143       assertEquals(entry1, reader.getCurrentEntry());
144       assertEquals(1, reader.getCurrentIndex());
145       assertTrue(reader.hasNext());
146
147       assertTrue(reader.hasNext());
148       assertEquals(2, reader.getNextIndex());
149       entry2 = reader.next();
150       assertEquals(2, entry2.index());
151       assertEquals(entry2, reader.getCurrentEntry());
152       assertEquals(2, reader.getCurrentIndex());
153       assertFalse(reader.hasNext());
154
155       // Reset the reader.
156       reader.reset();
157
158       // Test opening a new reader and reading from the journal.
159       reader = journal.openReader(1);
160       assertTrue(reader.hasNext());
161       entry1 = reader.next();
162       assertEquals(1, entry1.index());
163       assertEquals(entry1, reader.getCurrentEntry());
164       assertEquals(1, reader.getCurrentIndex());
165       assertTrue(reader.hasNext());
166
167       assertTrue(reader.hasNext());
168       assertEquals(2, reader.getNextIndex());
169       entry2 = reader.next();
170       assertEquals(2, entry2.index());
171       assertEquals(entry2, reader.getCurrentEntry());
172       assertEquals(2, reader.getCurrentIndex());
173       assertFalse(reader.hasNext());
174
175       // Truncate the journal and write a different entry.
176       writer.truncate(1);
177       assertEquals(2, writer.getNextIndex());
178       writer.append(new Indexed<>(2, ENTRY, 0));
179       reader.reset(2);
180       indexed = reader.next();
181       assertEquals(2, indexed.index());
182
183       // Reset the reader to a specific index and read the last entry again.
184       reader.reset(2);
185
186       assertNotNull(reader.getCurrentEntry());
187       assertEquals(1, reader.getCurrentIndex());
188       assertEquals(1, reader.getCurrentEntry().index());
189       assertTrue(reader.hasNext());
190       assertEquals(2, reader.getNextIndex());
191       entry2 = reader.next();
192       assertEquals(2, entry2.index());
193       assertEquals(entry2, reader.getCurrentEntry());
194       assertEquals(2, reader.getCurrentIndex());
195       assertFalse(reader.hasNext());
196     }
197   }
198
199   @Test
200   public void testResetTruncateZero() throws Exception {
201     try (SegmentedJournal<TestEntry> journal = createJournal()) {
202       JournalWriter<TestEntry> writer = journal.writer();
203       JournalReader<TestEntry> reader = journal.openReader(1);
204
205       assertEquals(0, writer.getLastIndex());
206       writer.append(ENTRY);
207       writer.append(ENTRY);
208       writer.reset(1);
209       assertEquals(0, writer.getLastIndex());
210       writer.append(ENTRY);
211       assertEquals(1, reader.next().index());
212       writer.reset(1);
213       assertEquals(0, writer.getLastIndex());
214       writer.append(ENTRY);
215       assertEquals(1, writer.getLastIndex());
216       assertEquals(1, writer.getLastEntry().index());
217
218       assertTrue(reader.hasNext());
219       assertEquals(1, reader.next().index());
220
221       writer.truncate(0);
222       assertEquals(0, writer.getLastIndex());
223       assertNull(writer.getLastEntry());
224       writer.append(ENTRY);
225       assertEquals(1, writer.getLastIndex());
226       assertEquals(1, writer.getLastEntry().index());
227
228       assertTrue(reader.hasNext());
229       assertEquals(1, reader.next().index());
230     }
231   }
232
233   @Test
234   public void testTruncateRead() throws Exception {
235     int i = 10;
236     try (Journal<TestEntry> journal = createJournal()) {
237       JournalWriter<TestEntry> writer = journal.writer();
238       JournalReader<TestEntry> reader = journal.openReader(1);
239
240       for (int j = 1; j <= i; j++) {
241         assertEquals(j, writer.append(new TestEntry(32)).index());
242       }
243
244       for (int j = 1; j <= i - 2; j++) {
245         assertTrue(reader.hasNext());
246         assertEquals(j, reader.next().index());
247       }
248
249       writer.truncate(i - 2);
250
251       assertFalse(reader.hasNext());
252       assertEquals(i - 1, writer.append(new TestEntry(32)).index());
253       assertEquals(i, writer.append(new TestEntry(32)).index());
254
255       assertTrue(reader.hasNext());
256       Indexed<TestEntry> entry = reader.next();
257       assertEquals(i - 1, entry.index());
258       assertTrue(reader.hasNext());
259       entry = reader.next();
260       assertEquals(i, entry.index());
261     }
262   }
263
264   @Test
265   public void testWriteReadEntries() throws Exception {
266     try (Journal<TestEntry> journal = createJournal()) {
267       JournalWriter<TestEntry> writer = journal.writer();
268       JournalReader<TestEntry> reader = journal.openReader(1);
269
270       for (int i = 1; i <= entriesPerSegment * 5; i++) {
271         writer.append(ENTRY);
272         assertTrue(reader.hasNext());
273         Indexed<TestEntry> entry;
274         entry = reader.next();
275         assertEquals(i, entry.index());
276         assertEquals(32, entry.entry().bytes().length);
277         reader.reset(i);
278         entry = reader.next();
279         assertEquals(i, entry.index());
280         assertEquals(32, entry.entry().bytes().length);
281
282         if (i > 6) {
283           reader.reset(i - 5);
284           assertNotNull(reader.getCurrentEntry());
285           assertEquals(i - 6, reader.getCurrentIndex());
286           assertEquals(i - 6, reader.getCurrentEntry().index());
287           assertEquals(i - 5, reader.getNextIndex());
288           reader.reset(i + 1);
289         }
290
291         writer.truncate(i - 1);
292         writer.append(ENTRY);
293
294         assertTrue(reader.hasNext());
295         reader.reset(i);
296         assertTrue(reader.hasNext());
297         entry = reader.next();
298         assertEquals(i, entry.index());
299         assertEquals(32, entry.entry().bytes().length);
300       }
301     }
302   }
303
304   @Test
305   public void testWriteReadCommittedEntries() throws Exception {
306     try (Journal<TestEntry> journal = createJournal()) {
307       JournalWriter<TestEntry> writer = journal.writer();
308       JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
309
310       for (int i = 1; i <= entriesPerSegment * 5; i++) {
311         writer.append(ENTRY);
312         assertFalse(reader.hasNext());
313         writer.commit(i);
314         assertTrue(reader.hasNext());
315         Indexed<TestEntry> entry;
316         entry = reader.next();
317         assertEquals(i, entry.index());
318         assertEquals(32, entry.entry().bytes().length);
319         reader.reset(i);
320         entry = reader.next();
321         assertEquals(i, entry.index());
322         assertEquals(32, entry.entry().bytes().length);
323       }
324     }
325   }
326
327   @Test
328   public void testReadAfterCompact() throws Exception {
329     try (SegmentedJournal<TestEntry> journal = createJournal()) {
330       JournalWriter<TestEntry> writer = journal.writer();
331       JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
332       JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
333
334       for (int i = 1; i <= entriesPerSegment * 10; i++) {
335         assertEquals(i, writer.append(ENTRY).index());
336       }
337
338       assertEquals(1, uncommittedReader.getNextIndex());
339       assertTrue(uncommittedReader.hasNext());
340       assertEquals(1, committedReader.getNextIndex());
341       assertFalse(committedReader.hasNext());
342
343       writer.commit(entriesPerSegment * 9);
344
345       assertTrue(uncommittedReader.hasNext());
346       assertTrue(committedReader.hasNext());
347
348       for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
349         assertEquals(i, uncommittedReader.next().index());
350         assertEquals(i, committedReader.next().index());
351       }
352
353       journal.compact(entriesPerSegment * 5 + 1);
354
355       assertNull(uncommittedReader.getCurrentEntry());
356       assertEquals(0, uncommittedReader.getCurrentIndex());
357       assertTrue(uncommittedReader.hasNext());
358       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
359       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
360
361       assertNull(committedReader.getCurrentEntry());
362       assertEquals(0, committedReader.getCurrentIndex());
363       assertTrue(committedReader.hasNext());
364       assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
365       assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
366     }
367   }
368
369   @Before
370   @After
371   public void cleanupStorage() throws IOException {
372     if (Files.exists(PATH)) {
373       Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
374         @Override
375         public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
376           Files.delete(file);
377           return FileVisitResult.CONTINUE;
378         }
379
380         @Override
381         public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
382           Files.delete(dir);
383           return FileVisitResult.CONTINUE;
384         }
385       });
386     }
387   }
388 }