2 * Copyright 2017-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
25 import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer;
26 import java.io.IOException;
27 import java.nio.file.FileVisitResult;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.SimpleFileVisitor;
32 import java.nio.file.attribute.BasicFileAttributes;
33 import java.util.ArrayList;
34 import java.util.List;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.runner.RunWith;
39 import org.junit.runners.Parameterized;
44 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48 private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
49 .register(new TestEntrySerializer(), TestEntry.class)
50 .register(new ByteArraySerializer(), byte[].class)
53 protected static final TestEntry ENTRY = new TestEntry(32);
54 private static final Path PATH = Paths.get("target/test-logs/");
56 private final int maxSegmentSize;
57 protected final int entriesPerSegment;
59 protected AbstractJournalTest(final int maxSegmentSize) {
60 this.maxSegmentSize = maxSegmentSize;
61 int entryLength = NAMESPACE.serialize(ENTRY).length + 8;
62 entriesPerSegment = (maxSegmentSize - 64) / entryLength;
65 protected abstract StorageLevel storageLevel();
67 @Parameterized.Parameters
68 public static List<Object[]> primeNumbers() {
69 List<Object[]> runs = new ArrayList<>();
70 for (int i = 1; i <= 10; i++) {
71 for (int j = 1; j <= 10; j++) {
72 runs.add(new Object[]{64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j});
78 protected SegmentedJournal<TestEntry> createJournal() {
79 return SegmentedJournal.<TestEntry>builder()
81 .withDirectory(PATH.toFile())
82 .withNamespace(NAMESPACE)
83 .withStorageLevel(storageLevel())
84 .withMaxSegmentSize(maxSegmentSize)
90 public void testCloseMultipleTimes() {
92 final Journal<TestEntry> journal = createJournal();
102 public void testWriteRead() throws Exception {
103 try (Journal<TestEntry> journal = createJournal()) {
104 JournalWriter<TestEntry> writer = journal.writer();
105 JournalReader<TestEntry> reader = journal.openReader(1);
107 // Append a couple entries.
108 Indexed<TestEntry> indexed;
109 assertEquals(1, writer.getNextIndex());
110 indexed = writer.append(ENTRY);
111 assertEquals(1, indexed.index());
113 assertEquals(2, writer.getNextIndex());
114 writer.append(new Indexed<>(2, ENTRY, 0));
116 indexed = reader.next();
117 assertEquals(2, indexed.index());
118 assertFalse(reader.hasNext());
120 // Test reading an entry
121 Indexed<TestEntry> entry1;
123 entry1 = reader.next();
124 assertEquals(1, entry1.index());
125 assertEquals(entry1, reader.getCurrentEntry());
126 assertEquals(1, reader.getCurrentIndex());
128 // Test reading a second entry
129 Indexed<TestEntry> entry2;
130 assertTrue(reader.hasNext());
131 assertEquals(2, reader.getNextIndex());
132 entry2 = reader.next();
133 assertEquals(2, entry2.index());
134 assertEquals(entry2, reader.getCurrentEntry());
135 assertEquals(2, reader.getCurrentIndex());
136 assertFalse(reader.hasNext());
138 // Test opening a new reader and reading from the journal.
139 reader = journal.openReader(1);
140 assertTrue(reader.hasNext());
141 entry1 = reader.next();
142 assertEquals(1, entry1.index());
143 assertEquals(entry1, reader.getCurrentEntry());
144 assertEquals(1, reader.getCurrentIndex());
145 assertTrue(reader.hasNext());
147 assertTrue(reader.hasNext());
148 assertEquals(2, reader.getNextIndex());
149 entry2 = reader.next();
150 assertEquals(2, entry2.index());
151 assertEquals(entry2, reader.getCurrentEntry());
152 assertEquals(2, reader.getCurrentIndex());
153 assertFalse(reader.hasNext());
158 // Test opening a new reader and reading from the journal.
159 reader = journal.openReader(1);
160 assertTrue(reader.hasNext());
161 entry1 = reader.next();
162 assertEquals(1, entry1.index());
163 assertEquals(entry1, reader.getCurrentEntry());
164 assertEquals(1, reader.getCurrentIndex());
165 assertTrue(reader.hasNext());
167 assertTrue(reader.hasNext());
168 assertEquals(2, reader.getNextIndex());
169 entry2 = reader.next();
170 assertEquals(2, entry2.index());
171 assertEquals(entry2, reader.getCurrentEntry());
172 assertEquals(2, reader.getCurrentIndex());
173 assertFalse(reader.hasNext());
175 // Truncate the journal and write a different entry.
177 assertEquals(2, writer.getNextIndex());
178 writer.append(new Indexed<>(2, ENTRY, 0));
180 indexed = reader.next();
181 assertEquals(2, indexed.index());
183 // Reset the reader to a specific index and read the last entry again.
186 assertNotNull(reader.getCurrentEntry());
187 assertEquals(1, reader.getCurrentIndex());
188 assertEquals(1, reader.getCurrentEntry().index());
189 assertTrue(reader.hasNext());
190 assertEquals(2, reader.getNextIndex());
191 entry2 = reader.next();
192 assertEquals(2, entry2.index());
193 assertEquals(entry2, reader.getCurrentEntry());
194 assertEquals(2, reader.getCurrentIndex());
195 assertFalse(reader.hasNext());
200 public void testResetTruncateZero() throws Exception {
201 try (SegmentedJournal<TestEntry> journal = createJournal()) {
202 JournalWriter<TestEntry> writer = journal.writer();
203 JournalReader<TestEntry> reader = journal.openReader(1);
205 assertEquals(0, writer.getLastIndex());
206 writer.append(ENTRY);
207 writer.append(ENTRY);
209 assertEquals(0, writer.getLastIndex());
210 writer.append(ENTRY);
211 assertEquals(1, reader.next().index());
213 assertEquals(0, writer.getLastIndex());
214 writer.append(ENTRY);
215 assertEquals(1, writer.getLastIndex());
216 assertEquals(1, writer.getLastEntry().index());
218 assertTrue(reader.hasNext());
219 assertEquals(1, reader.next().index());
222 assertEquals(0, writer.getLastIndex());
223 assertNull(writer.getLastEntry());
224 writer.append(ENTRY);
225 assertEquals(1, writer.getLastIndex());
226 assertEquals(1, writer.getLastEntry().index());
228 assertTrue(reader.hasNext());
229 assertEquals(1, reader.next().index());
234 public void testTruncateRead() throws Exception {
236 try (Journal<TestEntry> journal = createJournal()) {
237 JournalWriter<TestEntry> writer = journal.writer();
238 JournalReader<TestEntry> reader = journal.openReader(1);
240 for (int j = 1; j <= i; j++) {
241 assertEquals(j, writer.append(new TestEntry(32)).index());
244 for (int j = 1; j <= i - 2; j++) {
245 assertTrue(reader.hasNext());
246 assertEquals(j, reader.next().index());
249 writer.truncate(i - 2);
251 assertFalse(reader.hasNext());
252 assertEquals(i - 1, writer.append(new TestEntry(32)).index());
253 assertEquals(i, writer.append(new TestEntry(32)).index());
255 assertTrue(reader.hasNext());
256 Indexed<TestEntry> entry = reader.next();
257 assertEquals(i - 1, entry.index());
258 assertTrue(reader.hasNext());
259 entry = reader.next();
260 assertEquals(i, entry.index());
265 public void testWriteReadEntries() throws Exception {
266 try (Journal<TestEntry> journal = createJournal()) {
267 JournalWriter<TestEntry> writer = journal.writer();
268 JournalReader<TestEntry> reader = journal.openReader(1);
270 for (int i = 1; i <= entriesPerSegment * 5; i++) {
271 writer.append(ENTRY);
272 assertTrue(reader.hasNext());
273 Indexed<TestEntry> entry;
274 entry = reader.next();
275 assertEquals(i, entry.index());
276 assertEquals(32, entry.entry().bytes().length);
278 entry = reader.next();
279 assertEquals(i, entry.index());
280 assertEquals(32, entry.entry().bytes().length);
284 assertNotNull(reader.getCurrentEntry());
285 assertEquals(i - 6, reader.getCurrentIndex());
286 assertEquals(i - 6, reader.getCurrentEntry().index());
287 assertEquals(i - 5, reader.getNextIndex());
291 writer.truncate(i - 1);
292 writer.append(ENTRY);
294 assertTrue(reader.hasNext());
296 assertTrue(reader.hasNext());
297 entry = reader.next();
298 assertEquals(i, entry.index());
299 assertEquals(32, entry.entry().bytes().length);
305 public void testWriteReadCommittedEntries() throws Exception {
306 try (Journal<TestEntry> journal = createJournal()) {
307 JournalWriter<TestEntry> writer = journal.writer();
308 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
310 for (int i = 1; i <= entriesPerSegment * 5; i++) {
311 writer.append(ENTRY);
312 assertFalse(reader.hasNext());
314 assertTrue(reader.hasNext());
315 Indexed<TestEntry> entry;
316 entry = reader.next();
317 assertEquals(i, entry.index());
318 assertEquals(32, entry.entry().bytes().length);
320 entry = reader.next();
321 assertEquals(i, entry.index());
322 assertEquals(32, entry.entry().bytes().length);
328 public void testReadAfterCompact() throws Exception {
329 try (SegmentedJournal<TestEntry> journal = createJournal()) {
330 JournalWriter<TestEntry> writer = journal.writer();
331 JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
332 JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
334 for (int i = 1; i <= entriesPerSegment * 10; i++) {
335 assertEquals(i, writer.append(ENTRY).index());
338 assertEquals(1, uncommittedReader.getNextIndex());
339 assertTrue(uncommittedReader.hasNext());
340 assertEquals(1, committedReader.getNextIndex());
341 assertFalse(committedReader.hasNext());
343 writer.commit(entriesPerSegment * 9);
345 assertTrue(uncommittedReader.hasNext());
346 assertTrue(committedReader.hasNext());
348 for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
349 assertEquals(i, uncommittedReader.next().index());
350 assertEquals(i, committedReader.next().index());
353 journal.compact(entriesPerSegment * 5 + 1);
355 assertNull(uncommittedReader.getCurrentEntry());
356 assertEquals(0, uncommittedReader.getCurrentIndex());
357 assertTrue(uncommittedReader.hasNext());
358 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
359 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
361 assertNull(committedReader.getCurrentEntry());
362 assertEquals(0, committedReader.getCurrentIndex());
363 assertTrue(committedReader.hasNext());
364 assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
365 assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
371 public void cleanupStorage() throws IOException {
372 if (Files.exists(PATH)) {
373 Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
375 public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
377 return FileVisitResult.CONTINUE;
381 public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException {
383 return FileVisitResult.CONTINUE;