Use ConcurrentHashMap.newKeySet()
[controller.git] / third-party / atomix / storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.utils.serializer.Namespace;
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.junit.runner.RunWith;
23 import org.junit.runners.Parameterized;
24
25 import java.io.IOException;
26 import java.nio.file.FileVisitResult;
27 import java.nio.file.Files;
28 import java.nio.file.Path;
29 import java.nio.file.Paths;
30 import java.nio.file.SimpleFileVisitor;
31 import java.nio.file.attribute.BasicFileAttributes;
32 import java.util.ArrayList;
33 import java.util.List;
34
35 import static org.junit.Assert.assertEquals;
36 import static org.junit.Assert.assertFalse;
37 import static org.junit.Assert.assertNotNull;
38 import static org.junit.Assert.assertNull;
39 import static org.junit.Assert.assertTrue;
40
41 /**
42  * Base journal test.
43  *
44  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
45  */
46 @RunWith(Parameterized.class)
47 public abstract class AbstractJournalTest {
48   private static final Namespace NAMESPACE = Namespace.builder()
49       .register(TestEntry.class)
50       .register(byte[].class)
51       .build();
52
53   protected static final TestEntry ENTRY = new TestEntry(32);
54   private static final Path PATH = Paths.get("target/test-logs/");
55
56   private final int maxSegmentSize;
57   private final int cacheSize;
58   protected final int entriesPerSegment;
59
60   protected AbstractJournalTest(int maxSegmentSize, int cacheSize) {
61     this.maxSegmentSize = maxSegmentSize;
62     this.cacheSize = cacheSize;
63     int entryLength = (NAMESPACE.serialize(ENTRY).length + 8);
64     this.entriesPerSegment = (maxSegmentSize - 64) / entryLength;
65   }
66
67   protected abstract StorageLevel storageLevel();
68
69   @Parameterized.Parameters
70   public static List<Object[]> primeNumbers() {
71     List<Object[]> runs = new ArrayList<>();
72     for (int i = 1; i <= 10; i++) {
73       for (int j = 1; j <= 10; j++) {
74         runs.add(new Object[]{64 + (i * (NAMESPACE.serialize(ENTRY).length + 8) + j), j});
75       }
76     }
77     return runs;
78   }
79
80   protected SegmentedJournal<TestEntry> createJournal() {
81     return SegmentedJournal.<TestEntry>builder()
82         .withName("test")
83         .withDirectory(PATH.toFile())
84         .withNamespace(NAMESPACE)
85         .withStorageLevel(storageLevel())
86         .withMaxSegmentSize(maxSegmentSize)
87         .withIndexDensity(.2)
88         .withCacheSize(cacheSize)
89         .build();
90   }
91
92   @Test
93   public void testCloseMultipleTimes() {
94     // given
95     final Journal<TestEntry> journal = createJournal();
96
97     // when
98     journal.close();
99
100     // then
101     journal.close();
102   }
103
104   @Test
105   public void testWriteRead() throws Exception {
106     try (Journal<TestEntry> journal = createJournal()) {
107       JournalWriter<TestEntry> writer = journal.writer();
108       JournalReader<TestEntry> reader = journal.openReader(1);
109
110       // Append a couple entries.
111       Indexed<TestEntry> indexed;
112       assertEquals(1, writer.getNextIndex());
113       indexed = writer.append(ENTRY);
114       assertEquals(1, indexed.index());
115
116       assertEquals(2, writer.getNextIndex());
117       writer.append(new Indexed<>(2, ENTRY, 0));
118       reader.reset(2);
119       indexed = reader.next();
120       assertEquals(2, indexed.index());
121       assertFalse(reader.hasNext());
122
123       // Test reading an entry
124       Indexed<TestEntry> entry1;
125       reader.reset();
126       entry1 = reader.next();
127       assertEquals(1, entry1.index());
128       assertEquals(entry1, reader.getCurrentEntry());
129       assertEquals(1, reader.getCurrentIndex());
130
131       // Test reading a second entry
132       Indexed<TestEntry> entry2;
133       assertTrue(reader.hasNext());
134       assertEquals(2, reader.getNextIndex());
135       entry2 = reader.next();
136       assertEquals(2, entry2.index());
137       assertEquals(entry2, reader.getCurrentEntry());
138       assertEquals(2, reader.getCurrentIndex());
139       assertFalse(reader.hasNext());
140
141       // Test opening a new reader and reading from the journal.
142       reader = journal.openReader(1);
143       assertTrue(reader.hasNext());
144       entry1 = reader.next();
145       assertEquals(1, entry1.index());
146       assertEquals(entry1, reader.getCurrentEntry());
147       assertEquals(1, reader.getCurrentIndex());
148       assertTrue(reader.hasNext());
149
150       assertTrue(reader.hasNext());
151       assertEquals(2, reader.getNextIndex());
152       entry2 = reader.next();
153       assertEquals(2, entry2.index());
154       assertEquals(entry2, reader.getCurrentEntry());
155       assertEquals(2, reader.getCurrentIndex());
156       assertFalse(reader.hasNext());
157
158       // Reset the reader.
159       reader.reset();
160
161       // Test opening a new reader and reading from the journal.
162       reader = journal.openReader(1);
163       assertTrue(reader.hasNext());
164       entry1 = reader.next();
165       assertEquals(1, entry1.index());
166       assertEquals(entry1, reader.getCurrentEntry());
167       assertEquals(1, reader.getCurrentIndex());
168       assertTrue(reader.hasNext());
169
170       assertTrue(reader.hasNext());
171       assertEquals(2, reader.getNextIndex());
172       entry2 = reader.next();
173       assertEquals(2, entry2.index());
174       assertEquals(entry2, reader.getCurrentEntry());
175       assertEquals(2, reader.getCurrentIndex());
176       assertFalse(reader.hasNext());
177
178       // Truncate the journal and write a different entry.
179       writer.truncate(1);
180       assertEquals(2, writer.getNextIndex());
181       writer.append(new Indexed<>(2, ENTRY, 0));
182       reader.reset(2);
183       indexed = reader.next();
184       assertEquals(2, indexed.index());
185
186       // Reset the reader to a specific index and read the last entry again.
187       reader.reset(2);
188
189       assertNotNull(reader.getCurrentEntry());
190       assertEquals(1, reader.getCurrentIndex());
191       assertEquals(1, reader.getCurrentEntry().index());
192       assertTrue(reader.hasNext());
193       assertEquals(2, reader.getNextIndex());
194       entry2 = reader.next();
195       assertEquals(2, entry2.index());
196       assertEquals(entry2, reader.getCurrentEntry());
197       assertEquals(2, reader.getCurrentIndex());
198       assertFalse(reader.hasNext());
199     }
200   }
201
202   @Test
203   public void testResetTruncateZero() throws Exception {
204     try (SegmentedJournal<TestEntry> journal = createJournal()) {
205       JournalWriter<TestEntry> writer = journal.writer();
206       JournalReader<TestEntry> reader = journal.openReader(1);
207
208       assertEquals(0, writer.getLastIndex());
209       writer.append(ENTRY);
210       writer.append(ENTRY);
211       writer.reset(1);
212       assertEquals(0, writer.getLastIndex());
213       writer.append(ENTRY);
214       assertEquals(1, reader.next().index());
215       writer.reset(1);
216       assertEquals(0, writer.getLastIndex());
217       writer.append(ENTRY);
218       assertEquals(1, writer.getLastIndex());
219       assertEquals(1, writer.getLastEntry().index());
220
221       assertTrue(reader.hasNext());
222       assertEquals(1, reader.next().index());
223
224       writer.truncate(0);
225       assertEquals(0, writer.getLastIndex());
226       assertNull(writer.getLastEntry());
227       writer.append(ENTRY);
228       assertEquals(1, writer.getLastIndex());
229       assertEquals(1, writer.getLastEntry().index());
230
231       assertTrue(reader.hasNext());
232       assertEquals(1, reader.next().index());
233     }
234   }
235
236   @Test
237   public void testTruncateRead() throws Exception {
238     int i = 10;
239     try (Journal<TestEntry> journal = createJournal()) {
240       JournalWriter<TestEntry> writer = journal.writer();
241       JournalReader<TestEntry> reader = journal.openReader(1);
242
243       for (int j = 1; j <= i; j++) {
244         assertEquals(j, writer.append(new TestEntry(32)).index());
245       }
246
247       for (int j = 1; j <= i - 2; j++) {
248         assertTrue(reader.hasNext());
249         assertEquals(j, reader.next().index());
250       }
251
252       writer.truncate(i - 2);
253
254       assertFalse(reader.hasNext());
255       assertEquals(i - 1, writer.append(new TestEntry(32)).index());
256       assertEquals(i, writer.append(new TestEntry(32)).index());
257
258       assertTrue(reader.hasNext());
259       Indexed<TestEntry> entry = reader.next();
260       assertEquals(i - 1, entry.index());
261       assertTrue(reader.hasNext());
262       entry = reader.next();
263       assertEquals(i, entry.index());
264     }
265   }
266
267   @Test
268   public void testWriteReadEntries() throws Exception {
269     try (Journal<TestEntry> journal = createJournal()) {
270       JournalWriter<TestEntry> writer = journal.writer();
271       JournalReader<TestEntry> reader = journal.openReader(1);
272
273       for (int i = 1; i <= entriesPerSegment * 5; i++) {
274         writer.append(ENTRY);
275         assertTrue(reader.hasNext());
276         Indexed<TestEntry> entry;
277         entry = reader.next();
278         assertEquals(i, entry.index());
279         assertEquals(32, entry.entry().bytes().length);
280         reader.reset(i);
281         entry = reader.next();
282         assertEquals(i, entry.index());
283         assertEquals(32, entry.entry().bytes().length);
284
285         if (i > 6) {
286           reader.reset(i - 5);
287           assertNotNull(reader.getCurrentEntry());
288           assertEquals(i - 6, reader.getCurrentIndex());
289           assertEquals(i - 6, reader.getCurrentEntry().index());
290           assertEquals(i - 5, reader.getNextIndex());
291           reader.reset(i + 1);
292         }
293
294         writer.truncate(i - 1);
295         writer.append(ENTRY);
296
297         assertTrue(reader.hasNext());
298         reader.reset(i);
299         assertTrue(reader.hasNext());
300         entry = reader.next();
301         assertEquals(i, entry.index());
302         assertEquals(32, entry.entry().bytes().length);
303       }
304     }
305   }
306
307   @Test
308   public void testWriteReadCommittedEntries() throws Exception {
309     try (Journal<TestEntry> journal = createJournal()) {
310       JournalWriter<TestEntry> writer = journal.writer();
311       JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
312
313       for (int i = 1; i <= entriesPerSegment * 5; i++) {
314         writer.append(ENTRY);
315         assertFalse(reader.hasNext());
316         writer.commit(i);
317         assertTrue(reader.hasNext());
318         Indexed<TestEntry> entry;
319         entry = reader.next();
320         assertEquals(i, entry.index());
321         assertEquals(32, entry.entry().bytes().length);
322         reader.reset(i);
323         entry = reader.next();
324         assertEquals(i, entry.index());
325         assertEquals(32, entry.entry().bytes().length);
326       }
327     }
328   }
329
330   @Test
331   public void testReadAfterCompact() throws Exception {
332     try (SegmentedJournal<TestEntry> journal = createJournal()) {
333       JournalWriter<TestEntry> writer = journal.writer();
334       JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
335       JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
336
337       for (int i = 1; i <= entriesPerSegment * 10; i++) {
338         assertEquals(i, writer.append(ENTRY).index());
339       }
340
341       assertEquals(1, uncommittedReader.getNextIndex());
342       assertTrue(uncommittedReader.hasNext());
343       assertEquals(1, committedReader.getNextIndex());
344       assertFalse(committedReader.hasNext());
345
346       writer.commit(entriesPerSegment * 9);
347
348       assertTrue(uncommittedReader.hasNext());
349       assertTrue(committedReader.hasNext());
350
351       for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
352         assertEquals(i, uncommittedReader.next().index());
353         assertEquals(i, committedReader.next().index());
354       }
355
356       journal.compact(entriesPerSegment * 5 + 1);
357
358       assertNull(uncommittedReader.getCurrentEntry());
359       assertEquals(0, uncommittedReader.getCurrentIndex());
360       assertTrue(uncommittedReader.hasNext());
361       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
362       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
363
364       assertNull(committedReader.getCurrentEntry());
365       assertEquals(0, committedReader.getCurrentIndex());
366       assertTrue(committedReader.hasNext());
367       assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
368       assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
369     }
370   }
371
372   @Before
373   @After
374   public void cleanupStorage() throws IOException {
375     if (Files.exists(PATH)) {
376       Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
377         @Override
378         public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
379           Files.delete(file);
380           return FileVisitResult.CONTINUE;
381         }
382
383         @Override
384         public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
385           Files.delete(dir);
386           return FileVisitResult.CONTINUE;
387         }
388       });
389     }
390   }
391 }