2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import io.atomix.utils.serializer.Namespace;
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.junit.runner.RunWith;
23 import org.junit.runners.Parameterized;
25 import java.io.IOException;
26 import java.nio.file.FileVisitResult;
27 import java.nio.file.Files;
28 import java.nio.file.Path;
29 import java.nio.file.Paths;
30 import java.nio.file.SimpleFileVisitor;
31 import java.nio.file.attribute.BasicFileAttributes;
32 import java.util.ArrayList;
33 import java.util.List;
35 import static org.junit.Assert.assertEquals;
36 import static org.junit.Assert.assertFalse;
37 import static org.junit.Assert.assertNotNull;
38 import static org.junit.Assert.assertNull;
39 import static org.junit.Assert.assertTrue;
44 * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48 private static final Namespace NAMESPACE = Namespace.builder()
49 .register(TestEntry.class)
50 .register(byte[].class)
53 protected static final TestEntry ENTRY = new TestEntry(32);
54 private static final Path PATH = Paths.get("target/test-logs/");
56 private final int maxSegmentSize;
57 private final int cacheSize;
58 protected final int entriesPerSegment;
60 protected AbstractJournalTest(int maxSegmentSize, int cacheSize) {
61 this.maxSegmentSize = maxSegmentSize;
62 this.cacheSize = cacheSize;
63 int entryLength = (NAMESPACE.serialize(ENTRY).length + 8);
64 this.entriesPerSegment = (maxSegmentSize - 64) / entryLength;
67 protected abstract StorageLevel storageLevel();
69 @Parameterized.Parameters
70 public static List<Object[]> primeNumbers() {
71 List<Object[]> runs = new ArrayList<>();
72 for (int i = 1; i <= 10; i++) {
73 for (int j = 1; j <= 10; j++) {
74 runs.add(new Object[]{64 + (i * (NAMESPACE.serialize(ENTRY).length + 8) + j), j});
80 protected SegmentedJournal<TestEntry> createJournal() {
81 return SegmentedJournal.<TestEntry>builder()
83 .withDirectory(PATH.toFile())
84 .withNamespace(NAMESPACE)
85 .withStorageLevel(storageLevel())
86 .withMaxSegmentSize(maxSegmentSize)
88 .withCacheSize(cacheSize)
93 public void testCloseMultipleTimes() {
95 final Journal<TestEntry> journal = createJournal();
105 public void testWriteRead() throws Exception {
106 try (Journal<TestEntry> journal = createJournal()) {
107 JournalWriter<TestEntry> writer = journal.writer();
108 JournalReader<TestEntry> reader = journal.openReader(1);
110 // Append a couple entries.
111 Indexed<TestEntry> indexed;
112 assertEquals(1, writer.getNextIndex());
113 indexed = writer.append(ENTRY);
114 assertEquals(1, indexed.index());
116 assertEquals(2, writer.getNextIndex());
117 writer.append(new Indexed<>(2, ENTRY, 0));
119 indexed = reader.next();
120 assertEquals(2, indexed.index());
121 assertFalse(reader.hasNext());
123 // Test reading an entry
124 Indexed<TestEntry> entry1;
126 entry1 = reader.next();
127 assertEquals(1, entry1.index());
128 assertEquals(entry1, reader.getCurrentEntry());
129 assertEquals(1, reader.getCurrentIndex());
131 // Test reading a second entry
132 Indexed<TestEntry> entry2;
133 assertTrue(reader.hasNext());
134 assertEquals(2, reader.getNextIndex());
135 entry2 = reader.next();
136 assertEquals(2, entry2.index());
137 assertEquals(entry2, reader.getCurrentEntry());
138 assertEquals(2, reader.getCurrentIndex());
139 assertFalse(reader.hasNext());
141 // Test opening a new reader and reading from the journal.
142 reader = journal.openReader(1);
143 assertTrue(reader.hasNext());
144 entry1 = reader.next();
145 assertEquals(1, entry1.index());
146 assertEquals(entry1, reader.getCurrentEntry());
147 assertEquals(1, reader.getCurrentIndex());
148 assertTrue(reader.hasNext());
150 assertTrue(reader.hasNext());
151 assertEquals(2, reader.getNextIndex());
152 entry2 = reader.next();
153 assertEquals(2, entry2.index());
154 assertEquals(entry2, reader.getCurrentEntry());
155 assertEquals(2, reader.getCurrentIndex());
156 assertFalse(reader.hasNext());
161 // Test opening a new reader and reading from the journal.
162 reader = journal.openReader(1);
163 assertTrue(reader.hasNext());
164 entry1 = reader.next();
165 assertEquals(1, entry1.index());
166 assertEquals(entry1, reader.getCurrentEntry());
167 assertEquals(1, reader.getCurrentIndex());
168 assertTrue(reader.hasNext());
170 assertTrue(reader.hasNext());
171 assertEquals(2, reader.getNextIndex());
172 entry2 = reader.next();
173 assertEquals(2, entry2.index());
174 assertEquals(entry2, reader.getCurrentEntry());
175 assertEquals(2, reader.getCurrentIndex());
176 assertFalse(reader.hasNext());
178 // Truncate the journal and write a different entry.
180 assertEquals(2, writer.getNextIndex());
181 writer.append(new Indexed<>(2, ENTRY, 0));
183 indexed = reader.next();
184 assertEquals(2, indexed.index());
186 // Reset the reader to a specific index and read the last entry again.
189 assertNotNull(reader.getCurrentEntry());
190 assertEquals(1, reader.getCurrentIndex());
191 assertEquals(1, reader.getCurrentEntry().index());
192 assertTrue(reader.hasNext());
193 assertEquals(2, reader.getNextIndex());
194 entry2 = reader.next();
195 assertEquals(2, entry2.index());
196 assertEquals(entry2, reader.getCurrentEntry());
197 assertEquals(2, reader.getCurrentIndex());
198 assertFalse(reader.hasNext());
203 public void testResetTruncateZero() throws Exception {
204 try (SegmentedJournal<TestEntry> journal = createJournal()) {
205 JournalWriter<TestEntry> writer = journal.writer();
206 JournalReader<TestEntry> reader = journal.openReader(1);
208 assertEquals(0, writer.getLastIndex());
209 writer.append(ENTRY);
210 writer.append(ENTRY);
212 assertEquals(0, writer.getLastIndex());
213 writer.append(ENTRY);
214 assertEquals(1, reader.next().index());
216 assertEquals(0, writer.getLastIndex());
217 writer.append(ENTRY);
218 assertEquals(1, writer.getLastIndex());
219 assertEquals(1, writer.getLastEntry().index());
221 assertTrue(reader.hasNext());
222 assertEquals(1, reader.next().index());
225 assertEquals(0, writer.getLastIndex());
226 assertNull(writer.getLastEntry());
227 writer.append(ENTRY);
228 assertEquals(1, writer.getLastIndex());
229 assertEquals(1, writer.getLastEntry().index());
231 assertTrue(reader.hasNext());
232 assertEquals(1, reader.next().index());
237 public void testTruncateRead() throws Exception {
239 try (Journal<TestEntry> journal = createJournal()) {
240 JournalWriter<TestEntry> writer = journal.writer();
241 JournalReader<TestEntry> reader = journal.openReader(1);
243 for (int j = 1; j <= i; j++) {
244 assertEquals(j, writer.append(new TestEntry(32)).index());
247 for (int j = 1; j <= i - 2; j++) {
248 assertTrue(reader.hasNext());
249 assertEquals(j, reader.next().index());
252 writer.truncate(i - 2);
254 assertFalse(reader.hasNext());
255 assertEquals(i - 1, writer.append(new TestEntry(32)).index());
256 assertEquals(i, writer.append(new TestEntry(32)).index());
258 assertTrue(reader.hasNext());
259 Indexed<TestEntry> entry = reader.next();
260 assertEquals(i - 1, entry.index());
261 assertTrue(reader.hasNext());
262 entry = reader.next();
263 assertEquals(i, entry.index());
268 public void testWriteReadEntries() throws Exception {
269 try (Journal<TestEntry> journal = createJournal()) {
270 JournalWriter<TestEntry> writer = journal.writer();
271 JournalReader<TestEntry> reader = journal.openReader(1);
273 for (int i = 1; i <= entriesPerSegment * 5; i++) {
274 writer.append(ENTRY);
275 assertTrue(reader.hasNext());
276 Indexed<TestEntry> entry;
277 entry = reader.next();
278 assertEquals(i, entry.index());
279 assertEquals(32, entry.entry().bytes().length);
281 entry = reader.next();
282 assertEquals(i, entry.index());
283 assertEquals(32, entry.entry().bytes().length);
287 assertNotNull(reader.getCurrentEntry());
288 assertEquals(i - 6, reader.getCurrentIndex());
289 assertEquals(i - 6, reader.getCurrentEntry().index());
290 assertEquals(i - 5, reader.getNextIndex());
294 writer.truncate(i - 1);
295 writer.append(ENTRY);
297 assertTrue(reader.hasNext());
299 assertTrue(reader.hasNext());
300 entry = reader.next();
301 assertEquals(i, entry.index());
302 assertEquals(32, entry.entry().bytes().length);
308 public void testWriteReadCommittedEntries() throws Exception {
309 try (Journal<TestEntry> journal = createJournal()) {
310 JournalWriter<TestEntry> writer = journal.writer();
311 JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
313 for (int i = 1; i <= entriesPerSegment * 5; i++) {
314 writer.append(ENTRY);
315 assertFalse(reader.hasNext());
317 assertTrue(reader.hasNext());
318 Indexed<TestEntry> entry;
319 entry = reader.next();
320 assertEquals(i, entry.index());
321 assertEquals(32, entry.entry().bytes().length);
323 entry = reader.next();
324 assertEquals(i, entry.index());
325 assertEquals(32, entry.entry().bytes().length);
331 public void testReadAfterCompact() throws Exception {
332 try (SegmentedJournal<TestEntry> journal = createJournal()) {
333 JournalWriter<TestEntry> writer = journal.writer();
334 JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
335 JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
337 for (int i = 1; i <= entriesPerSegment * 10; i++) {
338 assertEquals(i, writer.append(ENTRY).index());
341 assertEquals(1, uncommittedReader.getNextIndex());
342 assertTrue(uncommittedReader.hasNext());
343 assertEquals(1, committedReader.getNextIndex());
344 assertFalse(committedReader.hasNext());
346 writer.commit(entriesPerSegment * 9);
348 assertTrue(uncommittedReader.hasNext());
349 assertTrue(committedReader.hasNext());
351 for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
352 assertEquals(i, uncommittedReader.next().index());
353 assertEquals(i, committedReader.next().index());
356 journal.compact(entriesPerSegment * 5 + 1);
358 assertNull(uncommittedReader.getCurrentEntry());
359 assertEquals(0, uncommittedReader.getCurrentIndex());
360 assertTrue(uncommittedReader.hasNext());
361 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
362 assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
364 assertNull(committedReader.getCurrentEntry());
365 assertEquals(0, committedReader.getCurrentIndex());
366 assertTrue(committedReader.hasNext());
367 assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
368 assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
374 public void cleanupStorage() throws IOException {
375 if (Files.exists(PATH)) {
376 Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
378 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
380 return FileVisitResult.CONTINUE;
384 public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
386 return FileVisitResult.CONTINUE;