2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import io.atomix.storage.StorageLevel;
19 import io.atomix.utils.serializer.Namespace;
20 import org.junit.After;
21 import org.junit.Before;
22 import org.junit.Test;
23 import org.junit.runner.RunWith;
24 import org.junit.runners.Parameterized;
26 import java.io.IOException;
27 import java.nio.file.FileVisitResult;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.SimpleFileVisitor;
32 import java.nio.file.attribute.BasicFileAttributes;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertFalse;
39 import static org.junit.Assert.assertNotNull;
40 import static org.junit.Assert.assertNull;
41 import static org.junit.Assert.assertTrue;
46 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
48 @RunWith(Parameterized.class)
49 public abstract class AbstractJournalTest {
50 private static final Namespace NAMESPACE = Namespace.builder()
51 .register(TestEntry.class)
52 .register(byte[].class)
55 protected static final TestEntry ENTRY = new TestEntry(32);
56 private static final Path PATH = Paths.get("target/test-logs/");
58 private final int maxSegmentSize;
59 private final int cacheSize;
60 protected final int entriesPerSegment;
62 protected AbstractJournalTest(int maxSegmentSize, int cacheSize) {
63 this.maxSegmentSize = maxSegmentSize;
64 this.cacheSize = cacheSize;
65 int entryLength = (NAMESPACE.serialize(ENTRY).length + 8);
66 this.entriesPerSegment = (maxSegmentSize - 64) / entryLength;
69 protected abstract StorageLevel storageLevel();
71 @Parameterized.Parameters
72 public static Collection primeNumbers() {
73 List<Object[]> runs = new ArrayList<>();
74 for (int i = 1; i <= 10; i++) {
75 for (int j = 1; j <= 10; j++) {
76 runs.add(new Object[]{64 + (i * (NAMESPACE.serialize(ENTRY).length + 8) + j), j});
82 protected SegmentedJournal<TestEntry> createJournal() {
83 return SegmentedJournal.<TestEntry>builder()
85 .withDirectory(PATH.toFile())
86 .withNamespace(NAMESPACE)
87 .withStorageLevel(storageLevel())
88 .withMaxSegmentSize(maxSegmentSize)
90 .withCacheSize(cacheSize)
95 public void testCloseMultipleTimes() {
97 final Journal<TestEntry> journal = createJournal();
107 @SuppressWarnings("unchecked")
108 public void testWriteRead() throws Exception {
109 try (Journal<TestEntry> journal = createJournal()) {
110 JournalWriter<TestEntry> writer = journal.writer();
111 JournalReader<TestEntry> reader = journal.openReader(1);
113 // Append a couple entries.
114 Indexed<TestEntry> indexed;
115 assertEquals(1, writer.getNextIndex());
116 indexed = writer.append(ENTRY);
117 assertEquals(1, indexed.index());
119 assertEquals(2, writer.getNextIndex());
120 writer.append(new Indexed<>(2, ENTRY, 0));
122 indexed = reader.next();
123 assertEquals(2, indexed.index());
124 assertFalse(reader.hasNext());
126 // Test reading an entry
127 Indexed<TestEntry> entry1;
129 entry1 = (Indexed) reader.next();
130 assertEquals(1, entry1.index());
131 assertEquals(entry1, reader.getCurrentEntry());
132 assertEquals(1, reader.getCurrentIndex());
134 // Test reading a second entry
135 Indexed<TestEntry> entry2;
136 assertTrue(reader.hasNext());
137 assertEquals(2, reader.getNextIndex());
138 entry2 = (Indexed) reader.next();
139 assertEquals(2, entry2.index());
140 assertEquals(entry2, reader.getCurrentEntry());
141 assertEquals(2, reader.getCurrentIndex());
142 assertFalse(reader.hasNext());
144 // Test opening a new reader and reading from the journal.
145 reader = journal.openReader(1);
146 assertTrue(reader.hasNext());
147 entry1 = (Indexed) reader.next();
148 assertEquals(1, entry1.index());
149 assertEquals(entry1, reader.getCurrentEntry());
150 assertEquals(1, reader.getCurrentIndex());
151 assertTrue(reader.hasNext());
153 assertTrue(reader.hasNext());
154 assertEquals(2, reader.getNextIndex());
155 entry2 = (Indexed) reader.next();
156 assertEquals(2, entry2.index());
157 assertEquals(entry2, reader.getCurrentEntry());
158 assertEquals(2, reader.getCurrentIndex());
159 assertFalse(reader.hasNext());
164 // Test opening a new reader and reading from the journal.
165 reader = journal.openReader(1);
166 assertTrue(reader.hasNext());
167 entry1 = (Indexed) reader.next();
168 assertEquals(1, entry1.index());
169 assertEquals(entry1, reader.getCurrentEntry());
170 assertEquals(1, reader.getCurrentIndex());
171 assertTrue(reader.hasNext());
173 assertTrue(reader.hasNext());
174 assertEquals(2, reader.getNextIndex());
175 entry2 = (Indexed) reader.next();
176 assertEquals(2, entry2.index());
177 assertEquals(entry2, reader.getCurrentEntry());
178 assertEquals(2, reader.getCurrentIndex());
179 assertFalse(reader.hasNext());
181 // Truncate the journal and write a different entry.
183 assertEquals(2, writer.getNextIndex());
184 writer.append(new Indexed<>(2, ENTRY, 0));
186 indexed = reader.next();
187 assertEquals(2, indexed.index());
189 // Reset the reader to a specific index and read the last entry again.
192 assertNotNull(reader.getCurrentEntry());
193 assertEquals(1, reader.getCurrentIndex());
194 assertEquals(1, reader.getCurrentEntry().index());
195 assertTrue(reader.hasNext());
196 assertEquals(2, reader.getNextIndex());
197 entry2 = (Indexed) reader.next();
198 assertEquals(2, entry2.index());
199 assertEquals(entry2, reader.getCurrentEntry());
200 assertEquals(2, reader.getCurrentIndex());
201 assertFalse(reader.hasNext());
206 public void testResetTruncateZero() throws Exception {
207 try (SegmentedJournal<TestEntry> journal = createJournal()) {
208 JournalWriter<TestEntry> writer = journal.writer();
209 JournalReader<TestEntry> reader = journal.openReader(1);
211 assertEquals(0, writer.getLastIndex());
212 writer.append(ENTRY);
213 writer.append(ENTRY);
215 assertEquals(0, writer.getLastIndex());
216 writer.append(ENTRY);
217 assertEquals(1, reader.next().index());
219 assertEquals(0, writer.getLastIndex());
220 writer.append(ENTRY);
221 assertEquals(1, writer.getLastIndex());
222 assertEquals(1, writer.getLastEntry().index());
224 assertTrue(reader.hasNext());
225 assertEquals(1, reader.next().index());
228 assertEquals(0, writer.getLastIndex());
229 assertNull(writer.getLastEntry());
230 writer.append(ENTRY);
231 assertEquals(1, writer.getLastIndex());
232 assertEquals(1, writer.getLastEntry().index());
234 assertTrue(reader.hasNext());
235 assertEquals(1, reader.next().index());
240 public void testTruncateRead() throws Exception {
242 try (Journal<TestEntry> journal = createJournal()) {
243 JournalWriter<TestEntry> writer = journal.writer();
244 JournalReader<TestEntry> reader = journal.openReader(1);
246 for (int j = 1; j <= i; j++) {
247 assertEquals(j, writer.append(new TestEntry(32)).index());
250 for (int j = 1; j <= i - 2; j++) {
251 assertTrue(reader.hasNext());
252 assertEquals(j, reader.next().index());
255 writer.truncate(i - 2);
257 assertFalse(reader.hasNext());
258 assertEquals(i - 1, writer.append(new TestEntry(32)).index());
259 assertEquals(i, writer.append(new TestEntry(32)).index());
261 assertTrue(reader.hasNext());
262 Indexed<TestEntry> entry = reader.next();
263 assertEquals(i - 1, entry.index());
264 assertTrue(reader.hasNext());
265 entry = reader.next();
266 assertEquals(i, entry.index());
271 @SuppressWarnings("unchecked")
272 public void testWriteReadEntries() throws Exception {
273 try (Journal<TestEntry> journal = createJournal()) {
274 JournalWriter<TestEntry> writer = journal.writer();
275 JournalReader<TestEntry> reader = journal.openReader(1);
277 for (int i = 1; i <= entriesPerSegment * 5; i++) {
278 writer.append(ENTRY);
279 assertTrue(reader.hasNext());
280 Indexed<TestEntry> entry;
281 entry = (Indexed) reader.next();
282 assertEquals(i, entry.index());
283 assertEquals(32, entry.entry().bytes().length);
285 entry = (Indexed) reader.next();
286 assertEquals(i, entry.index());
287 assertEquals(32, entry.entry().bytes().length);
291 assertNotNull(reader.getCurrentEntry());
292 assertEquals(i - 6, reader.getCurrentIndex());
293 assertEquals(i - 6, reader.getCurrentEntry().index());
294 assertEquals(i - 5, reader.getNextIndex());
298 writer.truncate(i - 1);
299 writer.append(ENTRY);
301 assertTrue(reader.hasNext());
303 assertTrue(reader.hasNext());
304 entry = (Indexed) reader.next();
305 assertEquals(i, entry.index());
306 assertEquals(32, entry.entry().bytes().length);
312 @SuppressWarnings("unchecked")
313 public void testWriteReadCommittedEntries() throws Exception {
314 try (Journal<TestEntry> journal = createJournal()) {
315 JournalWriter<TestEntry> writer = journal.writer();
316 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
318 for (int i = 1; i <= entriesPerSegment * 5; i++) {
319 writer.append(ENTRY);
320 assertFalse(reader.hasNext());
322 assertTrue(reader.hasNext());
323 Indexed<TestEntry> entry;
324 entry = (Indexed) reader.next();
325 assertEquals(i, entry.index());
326 assertEquals(32, entry.entry().bytes().length);
328 entry = (Indexed) reader.next();
329 assertEquals(i, entry.index());
330 assertEquals(32, entry.entry().bytes().length);
336 public void testReadAfterCompact() throws Exception {
337 try (SegmentedJournal<TestEntry> journal = createJournal()) {
338 JournalWriter<TestEntry> writer = journal.writer();
339 JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
340 JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
342 for (int i = 1; i <= entriesPerSegment * 10; i++) {
343 assertEquals(i, writer.append(ENTRY).index());
346 assertEquals(1, uncommittedReader.getNextIndex());
347 assertTrue(uncommittedReader.hasNext());
348 assertEquals(1, committedReader.getNextIndex());
349 assertFalse(committedReader.hasNext());
351 writer.commit(entriesPerSegment * 9);
353 assertTrue(uncommittedReader.hasNext());
354 assertTrue(committedReader.hasNext());
356 for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
357 assertEquals(i, uncommittedReader.next().index());
358 assertEquals(i, committedReader.next().index());
361 journal.compact(entriesPerSegment * 5 + 1);
363 assertNull(uncommittedReader.getCurrentEntry());
364 assertEquals(0, uncommittedReader.getCurrentIndex());
365 assertTrue(uncommittedReader.hasNext());
366 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
367 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
369 assertNull(committedReader.getCurrentEntry());
370 assertEquals(0, committedReader.getCurrentIndex());
371 assertTrue(committedReader.hasNext());
372 assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
373 assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
379 public void cleanupStorage() throws IOException {
380 if (Files.exists(PATH)) {
381 Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
383 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
385 return FileVisitResult.CONTINUE;
389 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
391 return FileVisitResult.CONTINUE;