2 * Copyright 2017-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
25 import java.io.IOException;
26 import java.nio.file.FileVisitResult;
27 import java.nio.file.Files;
28 import java.nio.file.Path;
29 import java.nio.file.Paths;
30 import java.nio.file.SimpleFileVisitor;
31 import java.nio.file.attribute.BasicFileAttributes;
32 import java.util.ArrayList;
33 import java.util.List;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.junit.runner.RunWith;
38 import org.junit.runners.Parameterized;
43 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
45 @RunWith(Parameterized.class)
46 public abstract class AbstractJournalTest {
47 private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
48 .register(new TestEntrySerdes(), TestEntry.class)
49 .register(new ByteArraySerdes(), byte[].class)
52 protected static final TestEntry ENTRY = new TestEntry(32);
53 private static final Path PATH = Paths.get("target/test-logs/");
55 private final int maxSegmentSize;
56 protected final int entriesPerSegment;
58 protected AbstractJournalTest(final int maxSegmentSize) {
59 this.maxSegmentSize = maxSegmentSize;
60 int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
61 entriesPerSegment = (maxSegmentSize - 64) / entryLength;
64 protected abstract StorageLevel storageLevel();
66 @Parameterized.Parameters
67 public static List<Object[]> primeNumbers() {
68 List<Object[]> runs = new ArrayList<>();
69 for (int i = 1; i <= 10; i++) {
70 for (int j = 1; j <= 10; j++) {
71 runs.add(new Object[]{64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j});
77 protected SegmentedJournal<TestEntry> createJournal() {
78 return SegmentedJournal.<TestEntry>builder()
80 .withDirectory(PATH.toFile())
81 .withNamespace(NAMESPACE)
82 .withStorageLevel(storageLevel())
83 .withMaxSegmentSize(maxSegmentSize)
89 public void testCloseMultipleTimes() {
91 final Journal<TestEntry> journal = createJournal();
101 public void testWriteRead() throws Exception {
102 try (Journal<TestEntry> journal = createJournal()) {
103 JournalWriter<TestEntry> writer = journal.writer();
104 JournalReader<TestEntry> reader = journal.openReader(1);
106 // Append a couple entries.
107 Indexed<TestEntry> indexed;
108 assertEquals(1, writer.getNextIndex());
109 indexed = writer.append(ENTRY);
110 assertEquals(1, indexed.index());
112 assertEquals(2, writer.getNextIndex());
113 writer.append(new Indexed<>(2, ENTRY, 0));
115 indexed = reader.next();
116 assertEquals(2, indexed.index());
117 assertFalse(reader.hasNext());
119 // Test reading an entry
120 Indexed<TestEntry> entry1;
122 entry1 = reader.next();
123 assertEquals(1, entry1.index());
124 assertEquals(entry1, reader.getCurrentEntry());
125 assertEquals(1, reader.getCurrentIndex());
127 // Test reading a second entry
128 Indexed<TestEntry> entry2;
129 assertTrue(reader.hasNext());
130 assertEquals(2, reader.getNextIndex());
131 entry2 = reader.next();
132 assertEquals(2, entry2.index());
133 assertEquals(entry2, reader.getCurrentEntry());
134 assertEquals(2, reader.getCurrentIndex());
135 assertFalse(reader.hasNext());
137 // Test opening a new reader and reading from the journal.
138 reader = journal.openReader(1);
139 assertTrue(reader.hasNext());
140 entry1 = reader.next();
141 assertEquals(1, entry1.index());
142 assertEquals(entry1, reader.getCurrentEntry());
143 assertEquals(1, reader.getCurrentIndex());
144 assertTrue(reader.hasNext());
146 assertTrue(reader.hasNext());
147 assertEquals(2, reader.getNextIndex());
148 entry2 = reader.next();
149 assertEquals(2, entry2.index());
150 assertEquals(entry2, reader.getCurrentEntry());
151 assertEquals(2, reader.getCurrentIndex());
152 assertFalse(reader.hasNext());
157 // Test opening a new reader and reading from the journal.
158 reader = journal.openReader(1);
159 assertTrue(reader.hasNext());
160 entry1 = reader.next();
161 assertEquals(1, entry1.index());
162 assertEquals(entry1, reader.getCurrentEntry());
163 assertEquals(1, reader.getCurrentIndex());
164 assertTrue(reader.hasNext());
166 assertTrue(reader.hasNext());
167 assertEquals(2, reader.getNextIndex());
168 entry2 = reader.next();
169 assertEquals(2, entry2.index());
170 assertEquals(entry2, reader.getCurrentEntry());
171 assertEquals(2, reader.getCurrentIndex());
172 assertFalse(reader.hasNext());
174 // Truncate the journal and write a different entry.
176 assertEquals(2, writer.getNextIndex());
177 writer.append(new Indexed<>(2, ENTRY, 0));
179 indexed = reader.next();
180 assertEquals(2, indexed.index());
182 // Reset the reader to a specific index and read the last entry again.
185 assertNotNull(reader.getCurrentEntry());
186 assertEquals(1, reader.getCurrentIndex());
187 assertEquals(1, reader.getCurrentEntry().index());
188 assertTrue(reader.hasNext());
189 assertEquals(2, reader.getNextIndex());
190 entry2 = reader.next();
191 assertEquals(2, entry2.index());
192 assertEquals(entry2, reader.getCurrentEntry());
193 assertEquals(2, reader.getCurrentIndex());
194 assertFalse(reader.hasNext());
199 public void testResetTruncateZero() throws Exception {
200 try (SegmentedJournal<TestEntry> journal = createJournal()) {
201 JournalWriter<TestEntry> writer = journal.writer();
202 JournalReader<TestEntry> reader = journal.openReader(1);
204 assertEquals(0, writer.getLastIndex());
205 writer.append(ENTRY);
206 writer.append(ENTRY);
208 assertEquals(0, writer.getLastIndex());
209 writer.append(ENTRY);
210 assertEquals(1, reader.next().index());
212 assertEquals(0, writer.getLastIndex());
213 writer.append(ENTRY);
214 assertEquals(1, writer.getLastIndex());
215 assertEquals(1, writer.getLastEntry().index());
217 assertTrue(reader.hasNext());
218 assertEquals(1, reader.next().index());
221 assertEquals(0, writer.getLastIndex());
222 assertNull(writer.getLastEntry());
223 writer.append(ENTRY);
224 assertEquals(1, writer.getLastIndex());
225 assertEquals(1, writer.getLastEntry().index());
227 assertTrue(reader.hasNext());
228 assertEquals(1, reader.next().index());
233 public void testTruncateRead() throws Exception {
235 try (Journal<TestEntry> journal = createJournal()) {
236 JournalWriter<TestEntry> writer = journal.writer();
237 JournalReader<TestEntry> reader = journal.openReader(1);
239 for (int j = 1; j <= i; j++) {
240 assertEquals(j, writer.append(new TestEntry(32)).index());
243 for (int j = 1; j <= i - 2; j++) {
244 assertTrue(reader.hasNext());
245 assertEquals(j, reader.next().index());
248 writer.truncate(i - 2);
250 assertFalse(reader.hasNext());
251 assertEquals(i - 1, writer.append(new TestEntry(32)).index());
252 assertEquals(i, writer.append(new TestEntry(32)).index());
254 assertTrue(reader.hasNext());
255 Indexed<TestEntry> entry = reader.next();
256 assertEquals(i - 1, entry.index());
257 assertTrue(reader.hasNext());
258 entry = reader.next();
259 assertEquals(i, entry.index());
264 public void testWriteReadEntries() throws Exception {
265 try (Journal<TestEntry> journal = createJournal()) {
266 JournalWriter<TestEntry> writer = journal.writer();
267 JournalReader<TestEntry> reader = journal.openReader(1);
269 for (int i = 1; i <= entriesPerSegment * 5; i++) {
270 writer.append(ENTRY);
271 assertTrue(reader.hasNext());
272 Indexed<TestEntry> entry;
273 entry = reader.next();
274 assertEquals(i, entry.index());
275 assertEquals(32, entry.entry().bytes().length);
277 entry = reader.next();
278 assertEquals(i, entry.index());
279 assertEquals(32, entry.entry().bytes().length);
283 assertNotNull(reader.getCurrentEntry());
284 assertEquals(i - 6, reader.getCurrentIndex());
285 assertEquals(i - 6, reader.getCurrentEntry().index());
286 assertEquals(i - 5, reader.getNextIndex());
290 writer.truncate(i - 1);
291 writer.append(ENTRY);
293 assertTrue(reader.hasNext());
295 assertTrue(reader.hasNext());
296 entry = reader.next();
297 assertEquals(i, entry.index());
298 assertEquals(32, entry.entry().bytes().length);
304 public void testWriteReadCommittedEntries() throws Exception {
305 try (Journal<TestEntry> journal = createJournal()) {
306 JournalWriter<TestEntry> writer = journal.writer();
307 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
309 for (int i = 1; i <= entriesPerSegment * 5; i++) {
310 writer.append(ENTRY);
311 assertFalse(reader.hasNext());
313 assertTrue(reader.hasNext());
314 Indexed<TestEntry> entry;
315 entry = reader.next();
316 assertEquals(i, entry.index());
317 assertEquals(32, entry.entry().bytes().length);
319 entry = reader.next();
320 assertEquals(i, entry.index());
321 assertEquals(32, entry.entry().bytes().length);
327 public void testReadAfterCompact() throws Exception {
328 try (SegmentedJournal<TestEntry> journal = createJournal()) {
329 JournalWriter<TestEntry> writer = journal.writer();
330 JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
331 JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
333 for (int i = 1; i <= entriesPerSegment * 10; i++) {
334 assertEquals(i, writer.append(ENTRY).index());
337 assertEquals(1, uncommittedReader.getNextIndex());
338 assertTrue(uncommittedReader.hasNext());
339 assertEquals(1, committedReader.getNextIndex());
340 assertFalse(committedReader.hasNext());
342 writer.commit(entriesPerSegment * 9);
344 assertTrue(uncommittedReader.hasNext());
345 assertTrue(committedReader.hasNext());
347 for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
348 assertEquals(i, uncommittedReader.next().index());
349 assertEquals(i, committedReader.next().index());
352 journal.compact(entriesPerSegment * 5 + 1);
354 assertNull(uncommittedReader.getCurrentEntry());
355 assertEquals(0, uncommittedReader.getCurrentIndex());
356 assertTrue(uncommittedReader.hasNext());
357 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
358 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
360 assertNull(committedReader.getCurrentEntry());
361 assertEquals(0, committedReader.getCurrentIndex());
362 assertTrue(committedReader.hasNext());
363 assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
364 assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
370 public void cleanupStorage() throws IOException {
371 if (Files.exists(PATH)) {
372 Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
374 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
376 return FileVisitResult.CONTINUE;
380 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
382 return FileVisitResult.CONTINUE;