2 * Copyright 2017-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static org.junit.Assert.assertArrayEquals;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
24 import java.io.IOException;
25 import java.nio.file.FileVisitResult;
26 import java.nio.file.Files;
27 import java.nio.file.Path;
28 import java.nio.file.Paths;
29 import java.nio.file.SimpleFileVisitor;
30 import java.nio.file.attribute.BasicFileAttributes;
31 import java.util.ArrayList;
32 import java.util.List;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.eclipse.jdt.annotation.Nullable;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.runner.RunWith;
39 import org.junit.runners.Parameterized;
44 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48 @Deprecated(forRemoval = true, since="9.0.3")
49 private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
50 .register(new TestEntrySerdes(), TestEntry.class)
51 .register(new ByteArraySerdes(), byte[].class)
54 protected static final TestEntry ENTRY = new TestEntry(32);
55 private static final Path PATH = Paths.get("target/test-logs/");
57 private final StorageLevel storageLevel;
58 private final int maxSegmentSize;
59 protected final int entriesPerSegment;
61 protected AbstractJournalTest(final StorageLevel storageLevel, final int maxSegmentSize) {
62 this.storageLevel = storageLevel;
63 this.maxSegmentSize = maxSegmentSize;
64 int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
65 entriesPerSegment = (maxSegmentSize - 64) / entryLength;
68 @Parameterized.Parameters
69 public static List<Object[]> primeNumbers() {
70 var runs = new ArrayList<Object[]>();
71 for (int i = 1; i <= 10; i++) {
72 for (int j = 1; j <= 10; j++) {
73 runs.add(new Object[] { 64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j });
79 protected SegmentedJournal<TestEntry> createJournal() {
80 return SegmentedJournal.<TestEntry>builder()
82 .withDirectory(PATH.toFile())
83 .withNamespace(NAMESPACE)
84 .withStorageLevel(storageLevel)
85 .withMaxSegmentSize(maxSegmentSize)
91 public void testCloseMultipleTimes() {
93 final Journal<TestEntry> journal = createJournal();
103 public void testWriteRead() throws Exception {
104 try (Journal<TestEntry> journal = createJournal()) {
105 JournalWriter<TestEntry> writer = journal.writer();
106 JournalReader<TestEntry> reader = journal.openReader(1);
108 // Append a couple entries.
109 assertEquals(1, writer.getNextIndex());
110 var indexed = writer.append(ENTRY);
111 assertEquals(1, indexed.index());
113 assertEquals(2, writer.getNextIndex());
114 writer.append(ENTRY);
116 indexed = assertNext(reader);
117 assertEquals(2, indexed.index());
118 assertNoNext(reader);
120 // Test reading an entry
122 var entry1 = assertNext(reader);
123 assertEquals(1, entry1.index());
125 // Test reading a second entry
126 assertEquals(2, reader.getNextIndex());
127 var entry2 = assertNext(reader);
128 assertEquals(2, entry2.index());
129 assertEquals(3, reader.getNextIndex());
130 assertNoNext(reader);
132 // Test opening a new reader and reading from the journal.
133 reader = journal.openReader(1);
134 entry1 = assertNext(reader);
135 assertEquals(1, entry1.index());
137 assertEquals(2, reader.getNextIndex());
138 entry2 = assertNext(reader);
139 assertEquals(2, entry2.index());
140 assertNoNext(reader);
145 // Test opening a new reader and reading from the journal.
146 reader = journal.openReader(1);
147 entry1 = assertNext(reader);
148 assertEquals(1, entry1.index());
150 assertEquals(2, reader.getNextIndex());
151 entry2 = assertNext(reader);
152 assertEquals(2, entry2.index());
153 assertNoNext(reader);
155 // Truncate the journal and write a different entry.
157 assertEquals(2, writer.getNextIndex());
158 writer.append(ENTRY);
160 indexed = assertNext(reader);
161 assertEquals(2, indexed.index());
163 // Reset the reader to a specific index and read the last entry again.
166 assertEquals(2, reader.getNextIndex());
167 entry2 = assertNext(reader);
168 assertEquals(2, entry2.index());
169 assertNoNext(reader);
174 public void testResetTruncateZero() throws Exception {
175 try (SegmentedJournal<TestEntry> journal = createJournal()) {
176 JournalWriter<TestEntry> writer = journal.writer();
177 JournalReader<TestEntry> reader = journal.openReader(1);
179 assertEquals(0, writer.getLastIndex());
180 writer.append(ENTRY);
181 writer.append(ENTRY);
183 assertEquals(0, writer.getLastIndex());
184 writer.append(ENTRY);
186 var indexed = assertNext(reader);
187 assertEquals(1, indexed.index());
189 assertEquals(0, writer.getLastIndex());
190 indexed = writer.append(ENTRY);
191 assertEquals(1, writer.getLastIndex());
192 assertEquals(1, indexed.index());
194 indexed = assertNext(reader);
195 assertEquals(1, indexed.index());
198 assertEquals(0, writer.getLastIndex());
199 indexed = writer.append(ENTRY);
200 assertEquals(1, writer.getLastIndex());
201 assertEquals(1, indexed.index());
203 indexed = assertNext(reader);
204 assertEquals(1, indexed.index());
209 public void testTruncateRead() throws Exception {
211 try (Journal<TestEntry> journal = createJournal()) {
212 JournalWriter<TestEntry> writer = journal.writer();
213 JournalReader<TestEntry> reader = journal.openReader(1);
215 for (int j = 1; j <= i; j++) {
216 assertEquals(j, writer.append(new TestEntry(32)).index());
219 for (int j = 1; j <= i - 2; j++) {
220 assertEquals(j, assertNext(reader).index());
223 writer.truncate(i - 2);
225 assertNoNext(reader);
226 assertEquals(i - 1, writer.append(new TestEntry(32)).index());
227 assertEquals(i, writer.append(new TestEntry(32)).index());
229 var entry = assertNext(reader);
230 assertEquals(i - 1, entry.index());
231 entry = assertNext(reader);
232 assertNotNull(entry);
233 assertEquals(i, entry.index());
238 public void testWriteReadEntries() throws Exception {
239 try (Journal<TestEntry> journal = createJournal()) {
240 JournalWriter<TestEntry> writer = journal.writer();
241 JournalReader<TestEntry> reader = journal.openReader(1);
243 for (int i = 1; i <= entriesPerSegment * 5; i++) {
244 writer.append(ENTRY);
245 var entry = assertNext(reader);
246 assertEquals(i, entry.index());
247 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
249 entry = assertNext(reader);
250 assertEquals(i, entry.index());
251 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
255 assertEquals(i - 5, reader.getNextIndex());
260 writer.truncate(i - 1);
261 writer.append(ENTRY);
265 entry = assertNext(reader);
266 assertEquals(i, entry.index());
267 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
273 public void testWriteReadCommittedEntries() throws Exception {
274 try (Journal<TestEntry> journal = createJournal()) {
275 JournalWriter<TestEntry> writer = journal.writer();
276 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
278 for (int i = 1; i <= entriesPerSegment * 5; i++) {
279 writer.append(ENTRY);
280 assertNoNext(reader);
282 var entry = assertNext(reader);
283 assertEquals(i, entry.index());
284 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
286 entry = assertNext(reader);
287 assertEquals(i, entry.index());
288 assertArrayEquals(ENTRY.bytes(), entry.entry().bytes());
294 public void testReadAfterCompact() throws Exception {
295 try (SegmentedJournal<TestEntry> journal = createJournal()) {
296 JournalWriter<TestEntry> writer = journal.writer();
297 JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
298 JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
300 for (int i = 1; i <= entriesPerSegment * 10; i++) {
301 assertEquals(i, writer.append(ENTRY).index());
304 assertEquals(1, uncommittedReader.getNextIndex());
305 assertEquals(1, committedReader.getNextIndex());
307 // This creates asymmetry, as uncommitted reader will move one step ahead...
308 assertNext(uncommittedReader);
309 assertEquals(2, uncommittedReader.getNextIndex());
310 assertNoNext(committedReader);
311 assertEquals(1, committedReader.getNextIndex());
313 writer.commit(entriesPerSegment * 9);
315 // ... so here we catch up ...
316 assertNext(committedReader);
317 assertEquals(2, committedReader.getNextIndex());
319 // ... and continue from the second entry
320 for (int i = 2; i <= entriesPerSegment * 2.5; i++) {
321 var entry = assertNext(uncommittedReader);
322 assertEquals(i, entry.index());
324 entry = assertNext(committedReader);
325 assertEquals(i, entry.index());
328 journal.compact(entriesPerSegment * 5 + 1);
330 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
331 var entry = assertNext(uncommittedReader);
332 assertEquals(entriesPerSegment * 5 + 1, entry.index());
334 assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
335 entry = assertNext(committedReader);
336 assertEquals(entriesPerSegment * 5 + 1, entry.index());
341 * Tests reading from a compacted journal.
344 public void testCompactAndRecover() throws Exception {
345 try (var journal = createJournal()) {
346 // Write three segments to the journal.
347 final var writer = journal.writer();
348 for (int i = 0; i < entriesPerSegment * 3; i++) {
349 writer.append(ENTRY);
352 // Commit the entries and compact the first segment.
353 writer.commit(entriesPerSegment * 3);
354 journal.compact(entriesPerSegment + 1);
357 // Reopen the journal and create a reader.
358 try (var journal = createJournal()) {
359 final var writer = journal.writer();
360 final var reader = journal.openReader(1, JournalReader.Mode.COMMITS);
361 writer.append(ENTRY);
362 writer.append(ENTRY);
363 writer.commit(entriesPerSegment * 3);
365 // Ensure the reader starts at the first physical index in the journal.
366 assertEquals(entriesPerSegment + 1, reader.getNextIndex());
367 assertEquals(reader.getFirstIndex(), reader.getNextIndex());
368 assertEquals(entriesPerSegment + 1, assertNext(reader).index());
369 assertEquals(entriesPerSegment + 2, reader.getNextIndex());
375 public void cleanupStorage() throws IOException {
376 if (Files.exists(PATH)) {
377 Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
379 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
381 return FileVisitResult.CONTINUE;
385 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
387 return FileVisitResult.CONTINUE;
393 private static @NonNull Indexed<TestEntry> assertNext(final JournalReader<TestEntry> reader) {
394 final var ret = tryNext(reader);
399 private static void assertNoNext(final JournalReader<TestEntry> reader) {
400 assertNull(tryNext(reader));
403 private static @Nullable Indexed<TestEntry> tryNext(final JournalReader<TestEntry> reader) {
404 return reader.tryNext(Indexed::new);