2 * Copyright 2017-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static org.junit.Assert.assertArrayEquals;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
24 import java.io.IOException;
25 import java.nio.file.FileVisitResult;
26 import java.nio.file.Files;
27 import java.nio.file.Path;
28 import java.nio.file.Paths;
29 import java.nio.file.SimpleFileVisitor;
30 import java.nio.file.attribute.BasicFileAttributes;
31 import java.util.ArrayList;
32 import java.util.List;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.runner.RunWith;
39 import org.junit.runners.Parameterized;
44 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48 @Deprecated(forRemoval = true, since = "9.0.3")
49 private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
50 .register(new TestEntrySerdes(), TestEntry.class)
51 .register(new ByteArraySerdes(), byte[].class)
54 protected static final TestEntry ENTRY = new TestEntry(32);
55 private static final Path PATH = Paths.get("target/test-logs/");
57 private final StorageLevel storageLevel;
58 private final int maxSegmentSize;
59 protected final int entriesPerSegment;
61 protected AbstractJournalTest(final StorageLevel storageLevel, final int maxSegmentSize) {
62 this.storageLevel = storageLevel;
63 this.maxSegmentSize = maxSegmentSize;
64 int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
65 entriesPerSegment = (maxSegmentSize - 64) / entryLength;
68 @Parameterized.Parameters
69 public static List<Object[]> primeNumbers() {
70 var runs = new ArrayList<Object[]>();
71 for (int i = 1; i <= 10; i++) {
72 for (int j = 1; j <= 10; j++) {
73 runs.add(new Object[] { 64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j });
79 private SegmentedJournal<TestEntry> createJournal() {
80 return new SegmentedJournal<>(SegmentedByteBufJournal.builder()
82 .withDirectory(PATH.toFile())
83 .withStorageLevel(storageLevel)
84 .withMaxSegmentSize(maxSegmentSize)
86 .build(), NAMESPACE.toMapper());
90 public void testCloseMultipleTimes() {
92 final Journal<TestEntry> journal = createJournal();
102 public void testWriteRead() throws Exception {
103 try (var journal = createJournal()) {
104 final var writer = journal.writer();
105 final var reader = journal.openReader(1);
107 // Append a couple entries.
108 assertEquals(1, writer.getNextIndex());
109 var indexed = writer.append(ENTRY);
110 assertEquals(1, indexed.index());
112 assertEquals(2, writer.getNextIndex());
113 writer.append(ENTRY);
115 indexed = assertNext(reader);
116 assertEquals(2, indexed.index());
117 assertNoNext(reader);
119 // Test reading an entry
121 var entry1 = assertNext(reader);
122 assertEquals(1, entry1.index());
124 // Test reading a second entry
125 assertEquals(2, reader.getNextIndex());
126 var entry2 = assertNext(reader);
127 assertEquals(2, entry2.index());
128 assertEquals(3, reader.getNextIndex());
129 assertNoNext(reader);
131 // Test opening a new reader and reading from the journal.
132 final var reader2 = journal.openReader(1);
133 entry1 = assertNext(reader2);
134 assertEquals(1, entry1.index());
136 assertEquals(2, reader2.getNextIndex());
137 entry2 = assertNext(reader2);
138 assertEquals(2, entry2.index());
139 assertNoNext(reader2);
144 // Test opening a new reader and reading from the journal.
145 final var reader3 = journal.openReader(1);
146 entry1 = assertNext(reader3);
147 assertEquals(1, entry1.index());
149 assertEquals(2, reader3.getNextIndex());
150 entry2 = assertNext(reader3);
151 assertEquals(2, entry2.index());
152 assertNoNext(reader3);
154 // Truncate the journal and write a different entry.
156 assertEquals(2, writer.getNextIndex());
157 writer.append(ENTRY);
159 indexed = assertNext(reader3);
160 assertEquals(2, indexed.index());
162 // Reset the reader to a specific index and read the last entry again.
165 assertEquals(2, reader3.getNextIndex());
166 entry2 = assertNext(reader3);
167 assertEquals(2, entry2.index());
168 assertNoNext(reader3);
173 public void testResetTruncateZero() throws Exception {
174 try (var journal = createJournal()) {
175 final var writer = journal.writer();
176 final var reader = journal.openReader(1);
178 assertEquals(0, journal.lastIndex());
179 assertEquals(1, writer.getNextIndex());
180 writer.append(ENTRY);
181 writer.append(ENTRY);
184 assertEquals(0, journal.lastIndex());
185 assertEquals(1, writer.getNextIndex());
186 // Repeat to assert this is a no-op
188 assertEquals(0, journal.lastIndex());
189 assertEquals(1, writer.getNextIndex());
191 writer.append(ENTRY);
193 var indexed = assertNext(reader);
194 assertEquals(1, indexed.index());
196 assertEquals(0, journal.lastIndex());
197 assertEquals(1, writer.getNextIndex());
198 indexed = writer.append(ENTRY);
199 assertEquals(1, journal.lastIndex());
200 assertEquals(2, writer.getNextIndex());
201 assertEquals(1, indexed.index());
203 indexed = assertNext(reader);
204 assertEquals(1, indexed.index());
207 assertEquals(0, journal.lastIndex());
208 assertEquals(1, writer.getNextIndex());
209 indexed = writer.append(ENTRY);
210 assertEquals(1, journal.lastIndex());
211 assertEquals(2, writer.getNextIndex());
212 assertEquals(1, indexed.index());
214 indexed = assertNext(reader);
215 assertEquals(1, indexed.index());
220 public void testTruncateRead() throws Exception {
222 try (Journal<TestEntry> journal = createJournal()) {
223 JournalWriter<TestEntry> writer = journal.writer();
224 JournalReader<TestEntry> reader = journal.openReader(1);
226 for (int i = 1; i <= cnt; i++) {
227 assertEquals(i, writer.append(new TestEntry(32)).index());
230 for (int i = 1; i <= cnt - 2; i++) {
231 assertEquals(i, assertNext(reader).index());
234 writer.reset(cnt - 1);
236 assertNoNext(reader);
237 assertEquals(cnt - 1, writer.append(new TestEntry(32)).index());
238 assertEquals(cnt, writer.append(new TestEntry(32)).index());
240 var entry = assertNext(reader);
241 assertEquals(cnt - 1, entry.index());
242 entry = assertNext(reader);
243 assertNotNull(entry);
244 assertEquals(cnt, entry.index());
249 public void testWriteReadEntries() throws Exception {
250 try (Journal<TestEntry> journal = createJournal()) {
251 JournalWriter<TestEntry> writer = journal.writer();
252 JournalReader<TestEntry> reader = journal.openReader(1);
254 for (int i = 1; i <= entriesPerSegment * 5; i++) {
255 writer.append(ENTRY);
256 var entry = assertNext(reader);
257 assertEquals(i, entry.index());
258 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
260 entry = assertNext(reader);
261 assertEquals(i, entry.index());
262 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
266 assertEquals(i - 5, reader.getNextIndex());
272 writer.append(ENTRY);
276 entry = assertNext(reader);
277 assertEquals(i, entry.index());
278 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
284 public void testWriteReadCommittedEntries() throws Exception {
285 try (Journal<TestEntry> journal = createJournal()) {
286 JournalWriter<TestEntry> writer = journal.writer();
287 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
289 for (int i = 1; i <= entriesPerSegment * 5; i++) {
290 writer.append(ENTRY);
291 assertNoNext(reader);
293 var entry = assertNext(reader);
294 assertEquals(i, entry.index());
295 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
297 entry = assertNext(reader);
298 assertEquals(i, entry.index());
299 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
305 public void testReadAfterCompact() throws Exception {
306 try (SegmentedJournal<TestEntry> journal = createJournal()) {
307 JournalWriter<TestEntry> writer = journal.writer();
308 JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
309 JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
311 for (int i = 1; i <= entriesPerSegment * 10; i++) {
312 assertEquals(i, writer.append(ENTRY).index());
315 assertEquals(1, uncommittedReader.getNextIndex());
316 assertEquals(1, committedReader.getNextIndex());
318 // This creates asymmetry, as uncommitted reader will move one step ahead...
319 assertNext(uncommittedReader);
320 assertEquals(2, uncommittedReader.getNextIndex());
321 assertNoNext(committedReader);
322 assertEquals(1, committedReader.getNextIndex());
324 writer.commit(entriesPerSegment * 9);
326 // ... so here we catch up ...
327 assertNext(committedReader);
328 assertEquals(2, committedReader.getNextIndex());
330 // ... and continue from the second entry
331 for (int i = 2; i <= entriesPerSegment * 2.5; i++) {
332 var entry = assertNext(uncommittedReader);
333 assertEquals(i, entry.index());
335 entry = assertNext(committedReader);
336 assertEquals(i, entry.index());
339 journal.compact(entriesPerSegment * 5 + 1);
341 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
342 var entry = assertNext(uncommittedReader);
343 assertEquals(entriesPerSegment * 5 + 1, entry.index());
345 assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
346 entry = assertNext(committedReader);
347 assertEquals(entriesPerSegment * 5 + 1, entry.index());
352 * Tests reading from a compacted journal.
355 public void testCompactAndRecover() throws Exception {
356 try (var journal = createJournal()) {
357 // Write three segments to the journal.
358 final var writer = journal.writer();
359 for (int i = 0; i < entriesPerSegment * 3; i++) {
360 writer.append(ENTRY);
363 // Commit the entries and compact the first segment.
364 writer.commit(entriesPerSegment * 3);
365 journal.compact(entriesPerSegment + 1);
368 // Reopen the journal and create a reader.
369 try (var journal = createJournal()) {
370 final var writer = journal.writer();
371 final var reader = journal.openReader(1, JournalReader.Mode.COMMITS);
372 writer.append(ENTRY);
373 writer.append(ENTRY);
374 writer.commit(entriesPerSegment * 3);
376 // Ensure the reader starts at the first physical index in the journal.
377 assertEquals(entriesPerSegment + 1, reader.getNextIndex());
378 assertEquals(reader.getFirstIndex(), reader.getNextIndex());
379 assertEquals(entriesPerSegment + 1, assertNext(reader).index());
380 assertEquals(entriesPerSegment + 2, reader.getNextIndex());
386 public void cleanupStorage() throws IOException {
387 if (Files.exists(PATH)) {
388 Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
390 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
392 return FileVisitResult.CONTINUE;
396 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
398 return FileVisitResult.CONTINUE;
404 private static @NonNull Indexed<TestEntry> assertNext(final JournalReader<TestEntry> reader) {
405 final var ret = tryNext(reader);
410 private static void assertNoNext(final JournalReader<TestEntry> reader) {
411 assertNull(tryNext(reader));
414 private static @Nullable Indexed<TestEntry> tryNext(final JournalReader<TestEntry> reader) {
415 return reader.tryNext(Indexed::new);