2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import io.atomix.storage.StorageLevel;
19 import io.atomix.utils.serializer.Namespace;
20 import org.junit.After;
21 import org.junit.Before;
22 import org.junit.Test;
23 import org.junit.runner.RunWith;
24 import org.junit.runners.Parameterized;
26 import java.io.IOException;
27 import java.nio.file.FileVisitResult;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.SimpleFileVisitor;
32 import java.nio.file.attribute.BasicFileAttributes;
33 import java.util.ArrayList;
34 import java.util.List;
36 import static org.junit.Assert.assertEquals;
37 import static org.junit.Assert.assertFalse;
38 import static org.junit.Assert.assertNotNull;
39 import static org.junit.Assert.assertNull;
40 import static org.junit.Assert.assertTrue;
45 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
47 @RunWith(Parameterized.class)
48 public abstract class AbstractJournalTest {
49 private static final Namespace NAMESPACE = Namespace.builder()
50 .register(TestEntry.class)
51 .register(byte[].class)
54 protected static final TestEntry ENTRY = new TestEntry(32);
55 private static final Path PATH = Paths.get("target/test-logs/");
57 private final int maxSegmentSize;
58 private final int cacheSize;
59 protected final int entriesPerSegment;
61 protected AbstractJournalTest(int maxSegmentSize, int cacheSize) {
62 this.maxSegmentSize = maxSegmentSize;
63 this.cacheSize = cacheSize;
64 int entryLength = (NAMESPACE.serialize(ENTRY).length + 8);
65 this.entriesPerSegment = (maxSegmentSize - 64) / entryLength;
68 protected abstract StorageLevel storageLevel();
70 @Parameterized.Parameters
71 public static List<Object[]> primeNumbers() {
72 List<Object[]> runs = new ArrayList<>();
73 for (int i = 1; i <= 10; i++) {
74 for (int j = 1; j <= 10; j++) {
75 runs.add(new Object[]{64 + (i * (NAMESPACE.serialize(ENTRY).length + 8) + j), j});
81 protected SegmentedJournal<TestEntry> createJournal() {
82 return SegmentedJournal.<TestEntry>builder()
84 .withDirectory(PATH.toFile())
85 .withNamespace(NAMESPACE)
86 .withStorageLevel(storageLevel())
87 .withMaxSegmentSize(maxSegmentSize)
89 .withCacheSize(cacheSize)
94 public void testCloseMultipleTimes() {
96 final Journal<TestEntry> journal = createJournal();
106 public void testWriteRead() throws Exception {
107 try (Journal<TestEntry> journal = createJournal()) {
108 JournalWriter<TestEntry> writer = journal.writer();
109 JournalReader<TestEntry> reader = journal.openReader(1);
111 // Append a couple entries.
112 Indexed<TestEntry> indexed;
113 assertEquals(1, writer.getNextIndex());
114 indexed = writer.append(ENTRY);
115 assertEquals(1, indexed.index());
117 assertEquals(2, writer.getNextIndex());
118 writer.append(new Indexed<>(2, ENTRY, 0));
120 indexed = reader.next();
121 assertEquals(2, indexed.index());
122 assertFalse(reader.hasNext());
124 // Test reading an entry
125 Indexed<TestEntry> entry1;
127 entry1 = reader.next();
128 assertEquals(1, entry1.index());
129 assertEquals(entry1, reader.getCurrentEntry());
130 assertEquals(1, reader.getCurrentIndex());
132 // Test reading a second entry
133 Indexed<TestEntry> entry2;
134 assertTrue(reader.hasNext());
135 assertEquals(2, reader.getNextIndex());
136 entry2 = reader.next();
137 assertEquals(2, entry2.index());
138 assertEquals(entry2, reader.getCurrentEntry());
139 assertEquals(2, reader.getCurrentIndex());
140 assertFalse(reader.hasNext());
142 // Test opening a new reader and reading from the journal.
143 reader = journal.openReader(1);
144 assertTrue(reader.hasNext());
145 entry1 = reader.next();
146 assertEquals(1, entry1.index());
147 assertEquals(entry1, reader.getCurrentEntry());
148 assertEquals(1, reader.getCurrentIndex());
149 assertTrue(reader.hasNext());
151 assertTrue(reader.hasNext());
152 assertEquals(2, reader.getNextIndex());
153 entry2 = reader.next();
154 assertEquals(2, entry2.index());
155 assertEquals(entry2, reader.getCurrentEntry());
156 assertEquals(2, reader.getCurrentIndex());
157 assertFalse(reader.hasNext());
162 // Test opening a new reader and reading from the journal.
163 reader = journal.openReader(1);
164 assertTrue(reader.hasNext());
165 entry1 = reader.next();
166 assertEquals(1, entry1.index());
167 assertEquals(entry1, reader.getCurrentEntry());
168 assertEquals(1, reader.getCurrentIndex());
169 assertTrue(reader.hasNext());
171 assertTrue(reader.hasNext());
172 assertEquals(2, reader.getNextIndex());
173 entry2 = reader.next();
174 assertEquals(2, entry2.index());
175 assertEquals(entry2, reader.getCurrentEntry());
176 assertEquals(2, reader.getCurrentIndex());
177 assertFalse(reader.hasNext());
179 // Truncate the journal and write a different entry.
181 assertEquals(2, writer.getNextIndex());
182 writer.append(new Indexed<>(2, ENTRY, 0));
184 indexed = reader.next();
185 assertEquals(2, indexed.index());
187 // Reset the reader to a specific index and read the last entry again.
190 assertNotNull(reader.getCurrentEntry());
191 assertEquals(1, reader.getCurrentIndex());
192 assertEquals(1, reader.getCurrentEntry().index());
193 assertTrue(reader.hasNext());
194 assertEquals(2, reader.getNextIndex());
195 entry2 = reader.next();
196 assertEquals(2, entry2.index());
197 assertEquals(entry2, reader.getCurrentEntry());
198 assertEquals(2, reader.getCurrentIndex());
199 assertFalse(reader.hasNext());
204 public void testResetTruncateZero() throws Exception {
205 try (SegmentedJournal<TestEntry> journal = createJournal()) {
206 JournalWriter<TestEntry> writer = journal.writer();
207 JournalReader<TestEntry> reader = journal.openReader(1);
209 assertEquals(0, writer.getLastIndex());
210 writer.append(ENTRY);
211 writer.append(ENTRY);
213 assertEquals(0, writer.getLastIndex());
214 writer.append(ENTRY);
215 assertEquals(1, reader.next().index());
217 assertEquals(0, writer.getLastIndex());
218 writer.append(ENTRY);
219 assertEquals(1, writer.getLastIndex());
220 assertEquals(1, writer.getLastEntry().index());
222 assertTrue(reader.hasNext());
223 assertEquals(1, reader.next().index());
226 assertEquals(0, writer.getLastIndex());
227 assertNull(writer.getLastEntry());
228 writer.append(ENTRY);
229 assertEquals(1, writer.getLastIndex());
230 assertEquals(1, writer.getLastEntry().index());
232 assertTrue(reader.hasNext());
233 assertEquals(1, reader.next().index());
238 public void testTruncateRead() throws Exception {
240 try (Journal<TestEntry> journal = createJournal()) {
241 JournalWriter<TestEntry> writer = journal.writer();
242 JournalReader<TestEntry> reader = journal.openReader(1);
244 for (int j = 1; j <= i; j++) {
245 assertEquals(j, writer.append(new TestEntry(32)).index());
248 for (int j = 1; j <= i - 2; j++) {
249 assertTrue(reader.hasNext());
250 assertEquals(j, reader.next().index());
253 writer.truncate(i - 2);
255 assertFalse(reader.hasNext());
256 assertEquals(i - 1, writer.append(new TestEntry(32)).index());
257 assertEquals(i, writer.append(new TestEntry(32)).index());
259 assertTrue(reader.hasNext());
260 Indexed<TestEntry> entry = reader.next();
261 assertEquals(i - 1, entry.index());
262 assertTrue(reader.hasNext());
263 entry = reader.next();
264 assertEquals(i, entry.index());
269 public void testWriteReadEntries() throws Exception {
270 try (Journal<TestEntry> journal = createJournal()) {
271 JournalWriter<TestEntry> writer = journal.writer();
272 JournalReader<TestEntry> reader = journal.openReader(1);
274 for (int i = 1; i <= entriesPerSegment * 5; i++) {
275 writer.append(ENTRY);
276 assertTrue(reader.hasNext());
277 Indexed<TestEntry> entry;
278 entry = reader.next();
279 assertEquals(i, entry.index());
280 assertEquals(32, entry.entry().bytes().length);
282 entry = reader.next();
283 assertEquals(i, entry.index());
284 assertEquals(32, entry.entry().bytes().length);
288 assertNotNull(reader.getCurrentEntry());
289 assertEquals(i - 6, reader.getCurrentIndex());
290 assertEquals(i - 6, reader.getCurrentEntry().index());
291 assertEquals(i - 5, reader.getNextIndex());
295 writer.truncate(i - 1);
296 writer.append(ENTRY);
298 assertTrue(reader.hasNext());
300 assertTrue(reader.hasNext());
301 entry = reader.next();
302 assertEquals(i, entry.index());
303 assertEquals(32, entry.entry().bytes().length);
309 public void testWriteReadCommittedEntries() throws Exception {
310 try (Journal<TestEntry> journal = createJournal()) {
311 JournalWriter<TestEntry> writer = journal.writer();
312 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
314 for (int i = 1; i <= entriesPerSegment * 5; i++) {
315 writer.append(ENTRY);
316 assertFalse(reader.hasNext());
318 assertTrue(reader.hasNext());
319 Indexed<TestEntry> entry;
320 entry = reader.next();
321 assertEquals(i, entry.index());
322 assertEquals(32, entry.entry().bytes().length);
324 entry = reader.next();
325 assertEquals(i, entry.index());
326 assertEquals(32, entry.entry().bytes().length);
332 public void testReadAfterCompact() throws Exception {
333 try (SegmentedJournal<TestEntry> journal = createJournal()) {
334 JournalWriter<TestEntry> writer = journal.writer();
335 JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
336 JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
338 for (int i = 1; i <= entriesPerSegment * 10; i++) {
339 assertEquals(i, writer.append(ENTRY).index());
342 assertEquals(1, uncommittedReader.getNextIndex());
343 assertTrue(uncommittedReader.hasNext());
344 assertEquals(1, committedReader.getNextIndex());
345 assertFalse(committedReader.hasNext());
347 writer.commit(entriesPerSegment * 9);
349 assertTrue(uncommittedReader.hasNext());
350 assertTrue(committedReader.hasNext());
352 for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
353 assertEquals(i, uncommittedReader.next().index());
354 assertEquals(i, committedReader.next().index());
357 journal.compact(entriesPerSegment * 5 + 1);
359 assertNull(uncommittedReader.getCurrentEntry());
360 assertEquals(0, uncommittedReader.getCurrentIndex());
361 assertTrue(uncommittedReader.hasNext());
362 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
363 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
365 assertNull(committedReader.getCurrentEntry());
366 assertEquals(0, committedReader.getCurrentIndex());
367 assertTrue(committedReader.hasNext());
368 assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
369 assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
375 public void cleanupStorage() throws IOException {
376 if (Files.exists(PATH)) {
377 Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
379 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
381 return FileVisitResult.CONTINUE;
385 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
387 return FileVisitResult.CONTINUE;