Import atomix/{storage,utils}
[controller.git] / third-party / atomix / storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.StorageLevel;
19 import io.atomix.utils.serializer.Namespace;
20 import org.junit.After;
21 import org.junit.Before;
22 import org.junit.Test;
23 import org.junit.runner.RunWith;
24 import org.junit.runners.Parameterized;
25
26 import java.io.IOException;
27 import java.nio.file.FileVisitResult;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.SimpleFileVisitor;
32 import java.nio.file.attribute.BasicFileAttributes;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
36
37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertFalse;
39 import static org.junit.Assert.assertNotNull;
40 import static org.junit.Assert.assertNull;
41 import static org.junit.Assert.assertTrue;
42
43 /**
44  * Base journal test.
45  *
46  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
47  */
48 @RunWith(Parameterized.class)
49 public abstract class AbstractJournalTest {
50   private static final Namespace NAMESPACE = Namespace.builder()
51       .register(TestEntry.class)
52       .register(byte[].class)
53       .build();
54
55   protected static final TestEntry ENTRY = new TestEntry(32);
56   private static final Path PATH = Paths.get("target/test-logs/");
57
58   private final int maxSegmentSize;
59   private final int cacheSize;
60   protected final int entriesPerSegment;
61
62   protected AbstractJournalTest(int maxSegmentSize, int cacheSize) {
63     this.maxSegmentSize = maxSegmentSize;
64     this.cacheSize = cacheSize;
65     int entryLength = (NAMESPACE.serialize(ENTRY).length + 8);
66     this.entriesPerSegment = (maxSegmentSize - 64) / entryLength;
67   }
68
69   protected abstract StorageLevel storageLevel();
70
71   @Parameterized.Parameters
72   public static Collection primeNumbers() {
73     List<Object[]> runs = new ArrayList<>();
74     for (int i = 1; i <= 10; i++) {
75       for (int j = 1; j <= 10; j++) {
76         runs.add(new Object[]{64 + (i * (NAMESPACE.serialize(ENTRY).length + 8) + j), j});
77       }
78     }
79     return runs;
80   }
81
82   protected SegmentedJournal<TestEntry> createJournal() {
83     return SegmentedJournal.<TestEntry>builder()
84         .withName("test")
85         .withDirectory(PATH.toFile())
86         .withNamespace(NAMESPACE)
87         .withStorageLevel(storageLevel())
88         .withMaxSegmentSize(maxSegmentSize)
89         .withIndexDensity(.2)
90         .withCacheSize(cacheSize)
91         .build();
92   }
93
94   @Test
95   public void testCloseMultipleTimes() {
96     // given
97     final Journal<TestEntry> journal = createJournal();
98
99     // when
100     journal.close();
101
102     // then
103     journal.close();
104   }
105
106   @Test
107   @SuppressWarnings("unchecked")
108   public void testWriteRead() throws Exception {
109     try (Journal<TestEntry> journal = createJournal()) {
110       JournalWriter<TestEntry> writer = journal.writer();
111       JournalReader<TestEntry> reader = journal.openReader(1);
112
113       // Append a couple entries.
114       Indexed<TestEntry> indexed;
115       assertEquals(1, writer.getNextIndex());
116       indexed = writer.append(ENTRY);
117       assertEquals(1, indexed.index());
118
119       assertEquals(2, writer.getNextIndex());
120       writer.append(new Indexed<>(2, ENTRY, 0));
121       reader.reset(2);
122       indexed = reader.next();
123       assertEquals(2, indexed.index());
124       assertFalse(reader.hasNext());
125
126       // Test reading an entry
127       Indexed<TestEntry> entry1;
128       reader.reset();
129       entry1 = (Indexed) reader.next();
130       assertEquals(1, entry1.index());
131       assertEquals(entry1, reader.getCurrentEntry());
132       assertEquals(1, reader.getCurrentIndex());
133
134       // Test reading a second entry
135       Indexed<TestEntry> entry2;
136       assertTrue(reader.hasNext());
137       assertEquals(2, reader.getNextIndex());
138       entry2 = (Indexed) reader.next();
139       assertEquals(2, entry2.index());
140       assertEquals(entry2, reader.getCurrentEntry());
141       assertEquals(2, reader.getCurrentIndex());
142       assertFalse(reader.hasNext());
143
144       // Test opening a new reader and reading from the journal.
145       reader = journal.openReader(1);
146       assertTrue(reader.hasNext());
147       entry1 = (Indexed) reader.next();
148       assertEquals(1, entry1.index());
149       assertEquals(entry1, reader.getCurrentEntry());
150       assertEquals(1, reader.getCurrentIndex());
151       assertTrue(reader.hasNext());
152
153       assertTrue(reader.hasNext());
154       assertEquals(2, reader.getNextIndex());
155       entry2 = (Indexed) reader.next();
156       assertEquals(2, entry2.index());
157       assertEquals(entry2, reader.getCurrentEntry());
158       assertEquals(2, reader.getCurrentIndex());
159       assertFalse(reader.hasNext());
160
161       // Reset the reader.
162       reader.reset();
163
164       // Test opening a new reader and reading from the journal.
165       reader = journal.openReader(1);
166       assertTrue(reader.hasNext());
167       entry1 = (Indexed) reader.next();
168       assertEquals(1, entry1.index());
169       assertEquals(entry1, reader.getCurrentEntry());
170       assertEquals(1, reader.getCurrentIndex());
171       assertTrue(reader.hasNext());
172
173       assertTrue(reader.hasNext());
174       assertEquals(2, reader.getNextIndex());
175       entry2 = (Indexed) reader.next();
176       assertEquals(2, entry2.index());
177       assertEquals(entry2, reader.getCurrentEntry());
178       assertEquals(2, reader.getCurrentIndex());
179       assertFalse(reader.hasNext());
180
181       // Truncate the journal and write a different entry.
182       writer.truncate(1);
183       assertEquals(2, writer.getNextIndex());
184       writer.append(new Indexed<>(2, ENTRY, 0));
185       reader.reset(2);
186       indexed = reader.next();
187       assertEquals(2, indexed.index());
188
189       // Reset the reader to a specific index and read the last entry again.
190       reader.reset(2);
191
192       assertNotNull(reader.getCurrentEntry());
193       assertEquals(1, reader.getCurrentIndex());
194       assertEquals(1, reader.getCurrentEntry().index());
195       assertTrue(reader.hasNext());
196       assertEquals(2, reader.getNextIndex());
197       entry2 = (Indexed) reader.next();
198       assertEquals(2, entry2.index());
199       assertEquals(entry2, reader.getCurrentEntry());
200       assertEquals(2, reader.getCurrentIndex());
201       assertFalse(reader.hasNext());
202     }
203   }
204
205   @Test
206   public void testResetTruncateZero() throws Exception {
207     try (SegmentedJournal<TestEntry> journal = createJournal()) {
208       JournalWriter<TestEntry> writer = journal.writer();
209       JournalReader<TestEntry> reader = journal.openReader(1);
210
211       assertEquals(0, writer.getLastIndex());
212       writer.append(ENTRY);
213       writer.append(ENTRY);
214       writer.reset(1);
215       assertEquals(0, writer.getLastIndex());
216       writer.append(ENTRY);
217       assertEquals(1, reader.next().index());
218       writer.reset(1);
219       assertEquals(0, writer.getLastIndex());
220       writer.append(ENTRY);
221       assertEquals(1, writer.getLastIndex());
222       assertEquals(1, writer.getLastEntry().index());
223
224       assertTrue(reader.hasNext());
225       assertEquals(1, reader.next().index());
226
227       writer.truncate(0);
228       assertEquals(0, writer.getLastIndex());
229       assertNull(writer.getLastEntry());
230       writer.append(ENTRY);
231       assertEquals(1, writer.getLastIndex());
232       assertEquals(1, writer.getLastEntry().index());
233
234       assertTrue(reader.hasNext());
235       assertEquals(1, reader.next().index());
236     }
237   }
238
239   @Test
240   public void testTruncateRead() throws Exception {
241     int i = 10;
242     try (Journal<TestEntry> journal = createJournal()) {
243       JournalWriter<TestEntry> writer = journal.writer();
244       JournalReader<TestEntry> reader = journal.openReader(1);
245
246       for (int j = 1; j <= i; j++) {
247         assertEquals(j, writer.append(new TestEntry(32)).index());
248       }
249
250       for (int j = 1; j <= i - 2; j++) {
251         assertTrue(reader.hasNext());
252         assertEquals(j, reader.next().index());
253       }
254
255       writer.truncate(i - 2);
256
257       assertFalse(reader.hasNext());
258       assertEquals(i - 1, writer.append(new TestEntry(32)).index());
259       assertEquals(i, writer.append(new TestEntry(32)).index());
260
261       assertTrue(reader.hasNext());
262       Indexed<TestEntry> entry = reader.next();
263       assertEquals(i - 1, entry.index());
264       assertTrue(reader.hasNext());
265       entry = reader.next();
266       assertEquals(i, entry.index());
267     }
268   }
269
270   @Test
271   @SuppressWarnings("unchecked")
272   public void testWriteReadEntries() throws Exception {
273     try (Journal<TestEntry> journal = createJournal()) {
274       JournalWriter<TestEntry> writer = journal.writer();
275       JournalReader<TestEntry> reader = journal.openReader(1);
276
277       for (int i = 1; i <= entriesPerSegment * 5; i++) {
278         writer.append(ENTRY);
279         assertTrue(reader.hasNext());
280         Indexed<TestEntry> entry;
281         entry = (Indexed) reader.next();
282         assertEquals(i, entry.index());
283         assertEquals(32, entry.entry().bytes().length);
284         reader.reset(i);
285         entry = (Indexed) reader.next();
286         assertEquals(i, entry.index());
287         assertEquals(32, entry.entry().bytes().length);
288
289         if (i > 6) {
290           reader.reset(i - 5);
291           assertNotNull(reader.getCurrentEntry());
292           assertEquals(i - 6, reader.getCurrentIndex());
293           assertEquals(i - 6, reader.getCurrentEntry().index());
294           assertEquals(i - 5, reader.getNextIndex());
295           reader.reset(i + 1);
296         }
297
298         writer.truncate(i - 1);
299         writer.append(ENTRY);
300
301         assertTrue(reader.hasNext());
302         reader.reset(i);
303         assertTrue(reader.hasNext());
304         entry = (Indexed) reader.next();
305         assertEquals(i, entry.index());
306         assertEquals(32, entry.entry().bytes().length);
307       }
308     }
309   }
310
311   @Test
312   @SuppressWarnings("unchecked")
313   public void testWriteReadCommittedEntries() throws Exception {
314     try (Journal<TestEntry> journal = createJournal()) {
315       JournalWriter<TestEntry> writer = journal.writer();
316       JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
317
318       for (int i = 1; i <= entriesPerSegment * 5; i++) {
319         writer.append(ENTRY);
320         assertFalse(reader.hasNext());
321         writer.commit(i);
322         assertTrue(reader.hasNext());
323         Indexed<TestEntry> entry;
324         entry = (Indexed) reader.next();
325         assertEquals(i, entry.index());
326         assertEquals(32, entry.entry().bytes().length);
327         reader.reset(i);
328         entry = (Indexed) reader.next();
329         assertEquals(i, entry.index());
330         assertEquals(32, entry.entry().bytes().length);
331       }
332     }
333   }
334
335   @Test
336   public void testReadAfterCompact() throws Exception {
337     try (SegmentedJournal<TestEntry> journal = createJournal()) {
338       JournalWriter<TestEntry> writer = journal.writer();
339       JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
340       JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
341
342       for (int i = 1; i <= entriesPerSegment * 10; i++) {
343         assertEquals(i, writer.append(ENTRY).index());
344       }
345
346       assertEquals(1, uncommittedReader.getNextIndex());
347       assertTrue(uncommittedReader.hasNext());
348       assertEquals(1, committedReader.getNextIndex());
349       assertFalse(committedReader.hasNext());
350
351       writer.commit(entriesPerSegment * 9);
352
353       assertTrue(uncommittedReader.hasNext());
354       assertTrue(committedReader.hasNext());
355
356       for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
357         assertEquals(i, uncommittedReader.next().index());
358         assertEquals(i, committedReader.next().index());
359       }
360
361       journal.compact(entriesPerSegment * 5 + 1);
362
363       assertNull(uncommittedReader.getCurrentEntry());
364       assertEquals(0, uncommittedReader.getCurrentIndex());
365       assertTrue(uncommittedReader.hasNext());
366       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
367       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
368
369       assertNull(committedReader.getCurrentEntry());
370       assertEquals(0, committedReader.getCurrentIndex());
371       assertTrue(committedReader.hasNext());
372       assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
373       assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
374     }
375   }
376
377   @Before
378   @After
379   public void cleanupStorage() throws IOException {
380     if (Files.exists(PATH)) {
381       Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
382         @Override
383         public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
384           Files.delete(file);
385           return FileVisitResult.CONTINUE;
386         }
387
388         @Override
389         public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
390           Files.delete(dir);
391           return FileVisitResult.CONTINUE;
392         }
393       });
394     }
395   }
396 }