2 * Copyright 2017-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertThrows;
24 import static org.junit.Assert.assertTrue;
26 import java.io.IOException;
27 import java.nio.file.FileVisitResult;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.SimpleFileVisitor;
32 import java.nio.file.attribute.BasicFileAttributes;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.NoSuchElementException;
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.Test;
39 import org.junit.runner.RunWith;
40 import org.junit.runners.Parameterized;
45 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
47 @RunWith(Parameterized.class)
48 public abstract class AbstractJournalTest {
49 private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
50 .register(new TestEntrySerdes(), TestEntry.class)
51 .register(new ByteArraySerdes(), byte[].class)
54 protected static final TestEntry ENTRY = new TestEntry(32);
55 private static final Path PATH = Paths.get("target/test-logs/");
57 private final StorageLevel storageLevel;
58 private final int maxSegmentSize;
59 protected final int entriesPerSegment;
61 protected AbstractJournalTest(final StorageLevel storageLevel, final int maxSegmentSize) {
62 this.storageLevel = storageLevel;
63 this.maxSegmentSize = maxSegmentSize;
64 int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
65 entriesPerSegment = (maxSegmentSize - 64) / entryLength;
68 @Parameterized.Parameters
69 public static List<Object[]> primeNumbers() {
70 var runs = new ArrayList<Object[]>();
71 for (int i = 1; i <= 10; i++) {
72 for (int j = 1; j <= 10; j++) {
73 runs.add(new Object[] { 64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j });
79 protected SegmentedJournal<TestEntry> createJournal() {
80 return SegmentedJournal.<TestEntry>builder()
82 .withDirectory(PATH.toFile())
83 .withNamespace(NAMESPACE)
84 .withStorageLevel(storageLevel)
85 .withMaxSegmentSize(maxSegmentSize)
91 public void testCloseMultipleTimes() {
93 final Journal<TestEntry> journal = createJournal();
103 public void testWriteRead() throws Exception {
104 try (Journal<TestEntry> journal = createJournal()) {
105 JournalWriter<TestEntry> writer = journal.writer();
106 JournalReader<TestEntry> reader = journal.openReader(1);
108 // Append a couple entries.
109 Indexed<TestEntry> indexed;
110 assertEquals(1, writer.getNextIndex());
111 indexed = writer.append(ENTRY);
112 assertEquals(1, indexed.index());
114 assertEquals(2, writer.getNextIndex());
115 writer.append(ENTRY);
117 indexed = reader.next();
118 assertEquals(2, indexed.index());
119 assertFalse(reader.hasNext());
121 // Test reading an entry
122 Indexed<TestEntry> entry1;
124 entry1 = reader.next();
125 assertEquals(1, entry1.index());
126 assertEquals(entry1, reader.getCurrentEntry());
127 assertEquals(1, reader.getCurrentIndex());
129 // Test reading a second entry
130 Indexed<TestEntry> entry2;
131 assertTrue(reader.hasNext());
132 assertEquals(2, reader.getNextIndex());
133 entry2 = reader.next();
134 assertEquals(2, entry2.index());
135 assertEquals(entry2, reader.getCurrentEntry());
136 assertEquals(2, reader.getCurrentIndex());
137 assertFalse(reader.hasNext());
139 // Test opening a new reader and reading from the journal.
140 reader = journal.openReader(1);
141 assertTrue(reader.hasNext());
142 entry1 = reader.next();
143 assertEquals(1, entry1.index());
144 assertEquals(entry1, reader.getCurrentEntry());
145 assertEquals(1, reader.getCurrentIndex());
146 assertTrue(reader.hasNext());
148 assertTrue(reader.hasNext());
149 assertEquals(2, reader.getNextIndex());
150 entry2 = reader.next();
151 assertEquals(2, entry2.index());
152 assertEquals(entry2, reader.getCurrentEntry());
153 assertEquals(2, reader.getCurrentIndex());
154 assertFalse(reader.hasNext());
159 // Test opening a new reader and reading from the journal.
160 reader = journal.openReader(1);
161 assertTrue(reader.hasNext());
162 entry1 = reader.next();
163 assertEquals(1, entry1.index());
164 assertEquals(entry1, reader.getCurrentEntry());
165 assertEquals(1, reader.getCurrentIndex());
166 assertTrue(reader.hasNext());
168 assertTrue(reader.hasNext());
169 assertEquals(2, reader.getNextIndex());
170 entry2 = reader.next();
171 assertEquals(2, entry2.index());
172 assertEquals(entry2, reader.getCurrentEntry());
173 assertEquals(2, reader.getCurrentIndex());
174 assertFalse(reader.hasNext());
176 // Truncate the journal and write a different entry.
178 assertEquals(2, writer.getNextIndex());
179 writer.append(ENTRY);
181 indexed = reader.next();
182 assertEquals(2, indexed.index());
184 // Reset the reader to a specific index and read the last entry again.
187 assertNotNull(reader.getCurrentEntry());
188 assertEquals(1, reader.getCurrentIndex());
189 assertEquals(1, reader.getCurrentEntry().index());
190 assertTrue(reader.hasNext());
191 assertEquals(2, reader.getNextIndex());
192 entry2 = reader.next();
193 assertEquals(2, entry2.index());
194 assertEquals(entry2, reader.getCurrentEntry());
195 assertEquals(2, reader.getCurrentIndex());
196 assertFalse(reader.hasNext());
201 public void testResetTruncateZero() throws Exception {
202 try (SegmentedJournal<TestEntry> journal = createJournal()) {
203 JournalWriter<TestEntry> writer = journal.writer();
204 JournalReader<TestEntry> reader = journal.openReader(1);
206 assertEquals(0, writer.getLastIndex());
207 writer.append(ENTRY);
208 writer.append(ENTRY);
210 assertEquals(0, writer.getLastIndex());
211 writer.append(ENTRY);
212 assertEquals(1, reader.next().index());
214 assertEquals(0, writer.getLastIndex());
215 writer.append(ENTRY);
216 assertEquals(1, writer.getLastIndex());
217 assertEquals(1, writer.getLastEntry().index());
219 assertTrue(reader.hasNext());
220 assertEquals(1, reader.next().index());
223 assertEquals(0, writer.getLastIndex());
224 assertNull(writer.getLastEntry());
225 writer.append(ENTRY);
226 assertEquals(1, writer.getLastIndex());
227 assertEquals(1, writer.getLastEntry().index());
229 assertTrue(reader.hasNext());
230 assertEquals(1, reader.next().index());
235 public void testTruncateRead() throws Exception {
237 try (Journal<TestEntry> journal = createJournal()) {
238 JournalWriter<TestEntry> writer = journal.writer();
239 JournalReader<TestEntry> reader = journal.openReader(1);
241 for (int j = 1; j <= i; j++) {
242 assertEquals(j, writer.append(new TestEntry(32)).index());
245 for (int j = 1; j <= i - 2; j++) {
246 assertTrue(reader.hasNext());
247 assertEquals(j, reader.next().index());
250 writer.truncate(i - 2);
252 assertFalse(reader.hasNext());
253 assertEquals(i - 1, writer.append(new TestEntry(32)).index());
254 assertEquals(i, writer.append(new TestEntry(32)).index());
256 assertTrue(reader.hasNext());
257 Indexed<TestEntry> entry = reader.next();
258 assertEquals(i - 1, entry.index());
259 assertTrue(reader.hasNext());
260 entry = reader.next();
261 assertEquals(i, entry.index());
266 public void testWriteReadEntries() throws Exception {
267 try (Journal<TestEntry> journal = createJournal()) {
268 JournalWriter<TestEntry> writer = journal.writer();
269 JournalReader<TestEntry> reader = journal.openReader(1);
271 for (int i = 1; i <= entriesPerSegment * 5; i++) {
272 writer.append(ENTRY);
273 assertTrue(reader.hasNext());
274 Indexed<TestEntry> entry;
275 entry = reader.next();
276 assertEquals(i, entry.index());
277 assertEquals(32, entry.entry().bytes().length);
279 entry = reader.next();
280 assertEquals(i, entry.index());
281 assertEquals(32, entry.entry().bytes().length);
285 assertNotNull(reader.getCurrentEntry());
286 assertEquals(i - 6, reader.getCurrentIndex());
287 assertEquals(i - 6, reader.getCurrentEntry().index());
288 assertEquals(i - 5, reader.getNextIndex());
292 writer.truncate(i - 1);
293 writer.append(ENTRY);
295 assertTrue(reader.hasNext());
297 assertTrue(reader.hasNext());
298 entry = reader.next();
299 assertEquals(i, entry.index());
300 assertEquals(32, entry.entry().bytes().length);
306 public void testWriteReadCommittedEntries() throws Exception {
307 try (Journal<TestEntry> journal = createJournal()) {
308 JournalWriter<TestEntry> writer = journal.writer();
309 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
311 for (int i = 1; i <= entriesPerSegment * 5; i++) {
312 writer.append(ENTRY);
313 assertFalse(reader.hasNext());
315 assertTrue(reader.hasNext());
316 Indexed<TestEntry> entry;
317 entry = reader.next();
318 assertEquals(i, entry.index());
319 assertEquals(32, entry.entry().bytes().length);
321 entry = reader.next();
322 assertEquals(i, entry.index());
323 assertEquals(32, entry.entry().bytes().length);
328 // Same as testWriteReadCommittedEntries(), but does not use hasNext() but checks whether an exception is thrown
330 public void testWriteReadCommittedEntriesException() throws Exception {
331 try (Journal<TestEntry> journal = createJournal()) {
332 JournalWriter<TestEntry> writer = journal.writer();
333 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
335 for (int i = 1; i <= entriesPerSegment * 5; i++) {
336 writer.append(ENTRY);
337 assertThrows(NoSuchElementException.class, reader::next);
339 Indexed<TestEntry> entry = reader.next();
340 assertEquals(i, entry.index());
341 assertEquals(32, entry.entry().bytes().length);
343 entry = reader.next();
344 assertEquals(i, entry.index());
345 assertEquals(32, entry.entry().bytes().length);
351 public void testReadAfterCompact() throws Exception {
352 try (SegmentedJournal<TestEntry> journal = createJournal()) {
353 JournalWriter<TestEntry> writer = journal.writer();
354 JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
355 JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
357 for (int i = 1; i <= entriesPerSegment * 10; i++) {
358 assertEquals(i, writer.append(ENTRY).index());
361 assertEquals(1, uncommittedReader.getNextIndex());
362 assertTrue(uncommittedReader.hasNext());
363 assertEquals(1, committedReader.getNextIndex());
364 assertFalse(committedReader.hasNext());
366 writer.commit(entriesPerSegment * 9);
368 assertTrue(uncommittedReader.hasNext());
369 assertTrue(committedReader.hasNext());
371 for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
372 assertEquals(i, uncommittedReader.next().index());
373 assertEquals(i, committedReader.next().index());
376 journal.compact(entriesPerSegment * 5 + 1);
378 assertNull(uncommittedReader.getCurrentEntry());
379 assertEquals(0, uncommittedReader.getCurrentIndex());
380 assertTrue(uncommittedReader.hasNext());
381 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
382 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
384 assertNull(committedReader.getCurrentEntry());
385 assertEquals(0, committedReader.getCurrentIndex());
386 assertTrue(committedReader.hasNext());
387 assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
388 assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
393 * Tests reading from a compacted journal.
396 public void testCompactAndRecover() throws Exception {
397 try (var journal = createJournal()) {
398 // Write three segments to the journal.
399 final var writer = journal.writer();
400 for (int i = 0; i < entriesPerSegment * 3; i++) {
401 writer.append(ENTRY);
404 // Commit the entries and compact the first segment.
405 writer.commit(entriesPerSegment * 3);
406 journal.compact(entriesPerSegment + 1);
409 // Reopen the journal and create a reader.
410 try (var journal = createJournal()) {
411 final var writer = journal.writer();
412 final var reader = journal.openReader(1, JournalReader.Mode.COMMITS);
413 writer.append(ENTRY);
414 writer.append(ENTRY);
415 writer.commit(entriesPerSegment * 3);
417 // Ensure the reader starts at the first physical index in the journal.
418 assertEquals(entriesPerSegment + 1, reader.getNextIndex());
419 assertEquals(reader.getFirstIndex(), reader.getNextIndex());
420 assertTrue(reader.hasNext());
421 assertEquals(entriesPerSegment + 1, reader.getNextIndex());
422 assertEquals(reader.getFirstIndex(), reader.getNextIndex());
423 assertEquals(entriesPerSegment + 1, reader.next().index());
429 public void cleanupStorage() throws IOException {
430 if (Files.exists(PATH)) {
431 Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
433 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
435 return FileVisitResult.CONTINUE;
439 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
441 return FileVisitResult.CONTINUE;