/* * Copyright 2017-2021 Open Networking Foundation * Copyright 2023 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.atomix.storage.journal; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.List; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; /** * Base journal test. * * @author Jordan Halterman */ @RunWith(Parameterized.class) public abstract class AbstractJournalTest { private static final JournalSerdes NAMESPACE = JournalSerdes.builder() .register(new TestEntrySerdes(), TestEntry.class) .register(new ByteArraySerdes(), byte[].class) .build(); protected static final TestEntry ENTRY = new TestEntry(32); private static final Path PATH = Paths.get("target/test-logs/"); private final int maxSegmentSize; protected final int entriesPerSegment; protected AbstractJournalTest(final int maxSegmentSize) { this.maxSegmentSize = maxSegmentSize; int entryLength = NAMESPACE.serialize(ENTRY).length + 8; entriesPerSegment = (maxSegmentSize - 64) / entryLength; } protected abstract StorageLevel storageLevel(); @Parameterized.Parameters public static List primeNumbers() { List runs = new ArrayList<>(); for (int i = 1; i <= 10; i++) { for (int j = 1; j <= 10; j++) { runs.add(new Object[]{64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j}); } } return runs; } protected SegmentedJournal createJournal() { return SegmentedJournal.builder() .withName("test") .withDirectory(PATH.toFile()) .withNamespace(NAMESPACE) .withStorageLevel(storageLevel()) .withMaxSegmentSize(maxSegmentSize) .withIndexDensity(.2) .build(); } @Test public void testCloseMultipleTimes() { // given final Journal journal = createJournal(); // when journal.close(); // then journal.close(); } @Test public void testWriteRead() throws Exception { try (Journal journal = createJournal()) { JournalWriter writer = journal.writer(); JournalReader reader = journal.openReader(1); // Append a couple entries. Indexed indexed; assertEquals(1, writer.getNextIndex()); indexed = writer.append(ENTRY); assertEquals(1, indexed.index()); assertEquals(2, writer.getNextIndex()); writer.append(new Indexed<>(2, ENTRY, 0)); reader.reset(2); indexed = reader.next(); assertEquals(2, indexed.index()); assertFalse(reader.hasNext()); // Test reading an entry Indexed entry1; reader.reset(); entry1 = reader.next(); assertEquals(1, entry1.index()); assertEquals(entry1, reader.getCurrentEntry()); assertEquals(1, reader.getCurrentIndex()); // Test reading a second entry Indexed entry2; assertTrue(reader.hasNext()); assertEquals(2, reader.getNextIndex()); entry2 = reader.next(); assertEquals(2, entry2.index()); assertEquals(entry2, reader.getCurrentEntry()); assertEquals(2, reader.getCurrentIndex()); assertFalse(reader.hasNext()); // Test opening a new reader and reading from the journal. reader = journal.openReader(1); assertTrue(reader.hasNext()); entry1 = reader.next(); assertEquals(1, entry1.index()); assertEquals(entry1, reader.getCurrentEntry()); assertEquals(1, reader.getCurrentIndex()); assertTrue(reader.hasNext()); assertTrue(reader.hasNext()); assertEquals(2, reader.getNextIndex()); entry2 = reader.next(); assertEquals(2, entry2.index()); assertEquals(entry2, reader.getCurrentEntry()); assertEquals(2, reader.getCurrentIndex()); assertFalse(reader.hasNext()); // Reset the reader. reader.reset(); // Test opening a new reader and reading from the journal. reader = journal.openReader(1); assertTrue(reader.hasNext()); entry1 = reader.next(); assertEquals(1, entry1.index()); assertEquals(entry1, reader.getCurrentEntry()); assertEquals(1, reader.getCurrentIndex()); assertTrue(reader.hasNext()); assertTrue(reader.hasNext()); assertEquals(2, reader.getNextIndex()); entry2 = reader.next(); assertEquals(2, entry2.index()); assertEquals(entry2, reader.getCurrentEntry()); assertEquals(2, reader.getCurrentIndex()); assertFalse(reader.hasNext()); // Truncate the journal and write a different entry. writer.truncate(1); assertEquals(2, writer.getNextIndex()); writer.append(new Indexed<>(2, ENTRY, 0)); reader.reset(2); indexed = reader.next(); assertEquals(2, indexed.index()); // Reset the reader to a specific index and read the last entry again. reader.reset(2); assertNotNull(reader.getCurrentEntry()); assertEquals(1, reader.getCurrentIndex()); assertEquals(1, reader.getCurrentEntry().index()); assertTrue(reader.hasNext()); assertEquals(2, reader.getNextIndex()); entry2 = reader.next(); assertEquals(2, entry2.index()); assertEquals(entry2, reader.getCurrentEntry()); assertEquals(2, reader.getCurrentIndex()); assertFalse(reader.hasNext()); } } @Test public void testResetTruncateZero() throws Exception { try (SegmentedJournal journal = createJournal()) { JournalWriter writer = journal.writer(); JournalReader reader = journal.openReader(1); assertEquals(0, writer.getLastIndex()); writer.append(ENTRY); writer.append(ENTRY); writer.reset(1); assertEquals(0, writer.getLastIndex()); writer.append(ENTRY); assertEquals(1, reader.next().index()); writer.reset(1); assertEquals(0, writer.getLastIndex()); writer.append(ENTRY); assertEquals(1, writer.getLastIndex()); assertEquals(1, writer.getLastEntry().index()); assertTrue(reader.hasNext()); assertEquals(1, reader.next().index()); writer.truncate(0); assertEquals(0, writer.getLastIndex()); assertNull(writer.getLastEntry()); writer.append(ENTRY); assertEquals(1, writer.getLastIndex()); assertEquals(1, writer.getLastEntry().index()); assertTrue(reader.hasNext()); assertEquals(1, reader.next().index()); } } @Test public void testTruncateRead() throws Exception { int i = 10; try (Journal journal = createJournal()) { JournalWriter writer = journal.writer(); JournalReader reader = journal.openReader(1); for (int j = 1; j <= i; j++) { assertEquals(j, writer.append(new TestEntry(32)).index()); } for (int j = 1; j <= i - 2; j++) { assertTrue(reader.hasNext()); assertEquals(j, reader.next().index()); } writer.truncate(i - 2); assertFalse(reader.hasNext()); assertEquals(i - 1, writer.append(new TestEntry(32)).index()); assertEquals(i, writer.append(new TestEntry(32)).index()); assertTrue(reader.hasNext()); Indexed entry = reader.next(); assertEquals(i - 1, entry.index()); assertTrue(reader.hasNext()); entry = reader.next(); assertEquals(i, entry.index()); } } @Test public void testWriteReadEntries() throws Exception { try (Journal journal = createJournal()) { JournalWriter writer = journal.writer(); JournalReader reader = journal.openReader(1); for (int i = 1; i <= entriesPerSegment * 5; i++) { writer.append(ENTRY); assertTrue(reader.hasNext()); Indexed entry; entry = reader.next(); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); reader.reset(i); entry = reader.next(); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); if (i > 6) { reader.reset(i - 5); assertNotNull(reader.getCurrentEntry()); assertEquals(i - 6, reader.getCurrentIndex()); assertEquals(i - 6, reader.getCurrentEntry().index()); assertEquals(i - 5, reader.getNextIndex()); reader.reset(i + 1); } writer.truncate(i - 1); writer.append(ENTRY); assertTrue(reader.hasNext()); reader.reset(i); assertTrue(reader.hasNext()); entry = reader.next(); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); } } } @Test public void testWriteReadCommittedEntries() throws Exception { try (Journal journal = createJournal()) { JournalWriter writer = journal.writer(); JournalReader reader = journal.openReader(1, JournalReader.Mode.COMMITS); for (int i = 1; i <= entriesPerSegment * 5; i++) { writer.append(ENTRY); assertFalse(reader.hasNext()); writer.commit(i); assertTrue(reader.hasNext()); Indexed entry; entry = reader.next(); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); reader.reset(i); entry = reader.next(); assertEquals(i, entry.index()); assertEquals(32, entry.entry().bytes().length); } } } @Test public void testReadAfterCompact() throws Exception { try (SegmentedJournal journal = createJournal()) { JournalWriter writer = journal.writer(); JournalReader uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL); JournalReader committedReader = journal.openReader(1, JournalReader.Mode.COMMITS); for (int i = 1; i <= entriesPerSegment * 10; i++) { assertEquals(i, writer.append(ENTRY).index()); } assertEquals(1, uncommittedReader.getNextIndex()); assertTrue(uncommittedReader.hasNext()); assertEquals(1, committedReader.getNextIndex()); assertFalse(committedReader.hasNext()); writer.commit(entriesPerSegment * 9); assertTrue(uncommittedReader.hasNext()); assertTrue(committedReader.hasNext()); for (int i = 1; i <= entriesPerSegment * 2.5; i++) { assertEquals(i, uncommittedReader.next().index()); assertEquals(i, committedReader.next().index()); } journal.compact(entriesPerSegment * 5 + 1); assertNull(uncommittedReader.getCurrentEntry()); assertEquals(0, uncommittedReader.getCurrentIndex()); assertTrue(uncommittedReader.hasNext()); assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex()); assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index()); assertNull(committedReader.getCurrentEntry()); assertEquals(0, committedReader.getCurrentIndex()); assertTrue(committedReader.hasNext()); assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex()); assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index()); } } @Before @After public void cleanupStorage() throws IOException { if (Files.exists(PATH)) { Files.walkFileTree(PATH, new SimpleFileVisitor() { @Override public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException { Files.delete(file); return FileVisitResult.CONTINUE; } @Override public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException { Files.delete(dir); return FileVisitResult.CONTINUE; } }); } } }