Do not fall back to default Kryo serializers
[controller.git] / third-party / atomix / storage / src / test / java / io / atomix / storage / journal / AbstractJournalTest.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer;
19 import io.atomix.utils.serializer.Namespace;
20 import org.junit.After;
21 import org.junit.Before;
22 import org.junit.Test;
23 import org.junit.runner.RunWith;
24 import org.junit.runners.Parameterized;
25
26 import java.io.IOException;
27 import java.nio.file.FileVisitResult;
28 import java.nio.file.Files;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.SimpleFileVisitor;
32 import java.nio.file.attribute.BasicFileAttributes;
33 import java.util.ArrayList;
34 import java.util.List;
35
36 import static org.junit.Assert.assertEquals;
37 import static org.junit.Assert.assertFalse;
38 import static org.junit.Assert.assertNotNull;
39 import static org.junit.Assert.assertNull;
40 import static org.junit.Assert.assertTrue;
41
42 /**
43  * Base journal test.
44  *
45  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
46  */
47 @RunWith(Parameterized.class)
48 public abstract class AbstractJournalTest {
49   private static final Namespace NAMESPACE = Namespace.builder()
50       .register(new TestEntrySerializer(), TestEntry.class)
51       .register(new ByteArraySerializer(), byte[].class)
52       .build();
53
54   protected static final TestEntry ENTRY = new TestEntry(32);
55   private static final Path PATH = Paths.get("target/test-logs/");
56
57   private final int maxSegmentSize;
58   protected final int entriesPerSegment;
59
60   protected AbstractJournalTest(int maxSegmentSize) {
61     this.maxSegmentSize = maxSegmentSize;
62     int entryLength = (NAMESPACE.serialize(ENTRY).length + 8);
63     this.entriesPerSegment = (maxSegmentSize - 64) / entryLength;
64   }
65
66   protected abstract StorageLevel storageLevel();
67
68   @Parameterized.Parameters
69   public static List<Object[]> primeNumbers() {
70     List<Object[]> runs = new ArrayList<>();
71     for (int i = 1; i <= 10; i++) {
72       for (int j = 1; j <= 10; j++) {
73         runs.add(new Object[]{64 + (i * (NAMESPACE.serialize(ENTRY).length + 8) + j)});
74       }
75     }
76     return runs;
77   }
78
79   protected SegmentedJournal<TestEntry> createJournal() {
80     return SegmentedJournal.<TestEntry>builder()
81         .withName("test")
82         .withDirectory(PATH.toFile())
83         .withNamespace(NAMESPACE)
84         .withStorageLevel(storageLevel())
85         .withMaxSegmentSize(maxSegmentSize)
86         .withIndexDensity(.2)
87         .build();
88   }
89
90   @Test
91   public void testCloseMultipleTimes() {
92     // given
93     final Journal<TestEntry> journal = createJournal();
94
95     // when
96     journal.close();
97
98     // then
99     journal.close();
100   }
101
102   @Test
103   public void testWriteRead() throws Exception {
104     try (Journal<TestEntry> journal = createJournal()) {
105       JournalWriter<TestEntry> writer = journal.writer();
106       JournalReader<TestEntry> reader = journal.openReader(1);
107
108       // Append a couple entries.
109       Indexed<TestEntry> indexed;
110       assertEquals(1, writer.getNextIndex());
111       indexed = writer.append(ENTRY);
112       assertEquals(1, indexed.index());
113
114       assertEquals(2, writer.getNextIndex());
115       writer.append(new Indexed<>(2, ENTRY, 0));
116       reader.reset(2);
117       indexed = reader.next();
118       assertEquals(2, indexed.index());
119       assertFalse(reader.hasNext());
120
121       // Test reading an entry
122       Indexed<TestEntry> entry1;
123       reader.reset();
124       entry1 = reader.next();
125       assertEquals(1, entry1.index());
126       assertEquals(entry1, reader.getCurrentEntry());
127       assertEquals(1, reader.getCurrentIndex());
128
129       // Test reading a second entry
130       Indexed<TestEntry> entry2;
131       assertTrue(reader.hasNext());
132       assertEquals(2, reader.getNextIndex());
133       entry2 = reader.next();
134       assertEquals(2, entry2.index());
135       assertEquals(entry2, reader.getCurrentEntry());
136       assertEquals(2, reader.getCurrentIndex());
137       assertFalse(reader.hasNext());
138
139       // Test opening a new reader and reading from the journal.
140       reader = journal.openReader(1);
141       assertTrue(reader.hasNext());
142       entry1 = reader.next();
143       assertEquals(1, entry1.index());
144       assertEquals(entry1, reader.getCurrentEntry());
145       assertEquals(1, reader.getCurrentIndex());
146       assertTrue(reader.hasNext());
147
148       assertTrue(reader.hasNext());
149       assertEquals(2, reader.getNextIndex());
150       entry2 = reader.next();
151       assertEquals(2, entry2.index());
152       assertEquals(entry2, reader.getCurrentEntry());
153       assertEquals(2, reader.getCurrentIndex());
154       assertFalse(reader.hasNext());
155
156       // Reset the reader.
157       reader.reset();
158
159       // Test opening a new reader and reading from the journal.
160       reader = journal.openReader(1);
161       assertTrue(reader.hasNext());
162       entry1 = reader.next();
163       assertEquals(1, entry1.index());
164       assertEquals(entry1, reader.getCurrentEntry());
165       assertEquals(1, reader.getCurrentIndex());
166       assertTrue(reader.hasNext());
167
168       assertTrue(reader.hasNext());
169       assertEquals(2, reader.getNextIndex());
170       entry2 = reader.next();
171       assertEquals(2, entry2.index());
172       assertEquals(entry2, reader.getCurrentEntry());
173       assertEquals(2, reader.getCurrentIndex());
174       assertFalse(reader.hasNext());
175
176       // Truncate the journal and write a different entry.
177       writer.truncate(1);
178       assertEquals(2, writer.getNextIndex());
179       writer.append(new Indexed<>(2, ENTRY, 0));
180       reader.reset(2);
181       indexed = reader.next();
182       assertEquals(2, indexed.index());
183
184       // Reset the reader to a specific index and read the last entry again.
185       reader.reset(2);
186
187       assertNotNull(reader.getCurrentEntry());
188       assertEquals(1, reader.getCurrentIndex());
189       assertEquals(1, reader.getCurrentEntry().index());
190       assertTrue(reader.hasNext());
191       assertEquals(2, reader.getNextIndex());
192       entry2 = reader.next();
193       assertEquals(2, entry2.index());
194       assertEquals(entry2, reader.getCurrentEntry());
195       assertEquals(2, reader.getCurrentIndex());
196       assertFalse(reader.hasNext());
197     }
198   }
199
200   @Test
201   public void testResetTruncateZero() throws Exception {
202     try (SegmentedJournal<TestEntry> journal = createJournal()) {
203       JournalWriter<TestEntry> writer = journal.writer();
204       JournalReader<TestEntry> reader = journal.openReader(1);
205
206       assertEquals(0, writer.getLastIndex());
207       writer.append(ENTRY);
208       writer.append(ENTRY);
209       writer.reset(1);
210       assertEquals(0, writer.getLastIndex());
211       writer.append(ENTRY);
212       assertEquals(1, reader.next().index());
213       writer.reset(1);
214       assertEquals(0, writer.getLastIndex());
215       writer.append(ENTRY);
216       assertEquals(1, writer.getLastIndex());
217       assertEquals(1, writer.getLastEntry().index());
218
219       assertTrue(reader.hasNext());
220       assertEquals(1, reader.next().index());
221
222       writer.truncate(0);
223       assertEquals(0, writer.getLastIndex());
224       assertNull(writer.getLastEntry());
225       writer.append(ENTRY);
226       assertEquals(1, writer.getLastIndex());
227       assertEquals(1, writer.getLastEntry().index());
228
229       assertTrue(reader.hasNext());
230       assertEquals(1, reader.next().index());
231     }
232   }
233
234   @Test
235   public void testTruncateRead() throws Exception {
236     int i = 10;
237     try (Journal<TestEntry> journal = createJournal()) {
238       JournalWriter<TestEntry> writer = journal.writer();
239       JournalReader<TestEntry> reader = journal.openReader(1);
240
241       for (int j = 1; j <= i; j++) {
242         assertEquals(j, writer.append(new TestEntry(32)).index());
243       }
244
245       for (int j = 1; j <= i - 2; j++) {
246         assertTrue(reader.hasNext());
247         assertEquals(j, reader.next().index());
248       }
249
250       writer.truncate(i - 2);
251
252       assertFalse(reader.hasNext());
253       assertEquals(i - 1, writer.append(new TestEntry(32)).index());
254       assertEquals(i, writer.append(new TestEntry(32)).index());
255
256       assertTrue(reader.hasNext());
257       Indexed<TestEntry> entry = reader.next();
258       assertEquals(i - 1, entry.index());
259       assertTrue(reader.hasNext());
260       entry = reader.next();
261       assertEquals(i, entry.index());
262     }
263   }
264
265   @Test
266   public void testWriteReadEntries() throws Exception {
267     try (Journal<TestEntry> journal = createJournal()) {
268       JournalWriter<TestEntry> writer = journal.writer();
269       JournalReader<TestEntry> reader = journal.openReader(1);
270
271       for (int i = 1; i <= entriesPerSegment * 5; i++) {
272         writer.append(ENTRY);
273         assertTrue(reader.hasNext());
274         Indexed<TestEntry> entry;
275         entry = reader.next();
276         assertEquals(i, entry.index());
277         assertEquals(32, entry.entry().bytes().length);
278         reader.reset(i);
279         entry = reader.next();
280         assertEquals(i, entry.index());
281         assertEquals(32, entry.entry().bytes().length);
282
283         if (i > 6) {
284           reader.reset(i - 5);
285           assertNotNull(reader.getCurrentEntry());
286           assertEquals(i - 6, reader.getCurrentIndex());
287           assertEquals(i - 6, reader.getCurrentEntry().index());
288           assertEquals(i - 5, reader.getNextIndex());
289           reader.reset(i + 1);
290         }
291
292         writer.truncate(i - 1);
293         writer.append(ENTRY);
294
295         assertTrue(reader.hasNext());
296         reader.reset(i);
297         assertTrue(reader.hasNext());
298         entry = reader.next();
299         assertEquals(i, entry.index());
300         assertEquals(32, entry.entry().bytes().length);
301       }
302     }
303   }
304
305   @Test
306   public void testWriteReadCommittedEntries() throws Exception {
307     try (Journal<TestEntry> journal = createJournal()) {
308       JournalWriter<TestEntry> writer = journal.writer();
309       JournalReader<TestEntry> reader = journal.openReader(1, JournalReader.Mode.COMMITS);
310
311       for (int i = 1; i <= entriesPerSegment * 5; i++) {
312         writer.append(ENTRY);
313         assertFalse(reader.hasNext());
314         writer.commit(i);
315         assertTrue(reader.hasNext());
316         Indexed<TestEntry> entry;
317         entry = reader.next();
318         assertEquals(i, entry.index());
319         assertEquals(32, entry.entry().bytes().length);
320         reader.reset(i);
321         entry = reader.next();
322         assertEquals(i, entry.index());
323         assertEquals(32, entry.entry().bytes().length);
324       }
325     }
326   }
327
328   @Test
329   public void testReadAfterCompact() throws Exception {
330     try (SegmentedJournal<TestEntry> journal = createJournal()) {
331       JournalWriter<TestEntry> writer = journal.writer();
332       JournalReader<TestEntry> uncommittedReader = journal.openReader(1, JournalReader.Mode.ALL);
333       JournalReader<TestEntry> committedReader = journal.openReader(1, JournalReader.Mode.COMMITS);
334
335       for (int i = 1; i <= entriesPerSegment * 10; i++) {
336         assertEquals(i, writer.append(ENTRY).index());
337       }
338
339       assertEquals(1, uncommittedReader.getNextIndex());
340       assertTrue(uncommittedReader.hasNext());
341       assertEquals(1, committedReader.getNextIndex());
342       assertFalse(committedReader.hasNext());
343
344       writer.commit(entriesPerSegment * 9);
345
346       assertTrue(uncommittedReader.hasNext());
347       assertTrue(committedReader.hasNext());
348
349       for (int i = 1; i <= entriesPerSegment * 2.5; i++) {
350         assertEquals(i, uncommittedReader.next().index());
351         assertEquals(i, committedReader.next().index());
352       }
353
354       journal.compact(entriesPerSegment * 5 + 1);
355
356       assertNull(uncommittedReader.getCurrentEntry());
357       assertEquals(0, uncommittedReader.getCurrentIndex());
358       assertTrue(uncommittedReader.hasNext());
359       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.getNextIndex());
360       assertEquals(entriesPerSegment * 5 + 1, uncommittedReader.next().index());
361
362       assertNull(committedReader.getCurrentEntry());
363       assertEquals(0, committedReader.getCurrentIndex());
364       assertTrue(committedReader.hasNext());
365       assertEquals(entriesPerSegment * 5 + 1, committedReader.getNextIndex());
366       assertEquals(entriesPerSegment * 5 + 1, committedReader.next().index());
367     }
368   }
369
370   @Before
371   @After
372   public void cleanupStorage() throws IOException {
373     if (Files.exists(PATH)) {
374       Files.walkFileTree(PATH, new SimpleFileVisitor<Path>() {
375         @Override
376         public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
377           Files.delete(file);
378           return FileVisitResult.CONTINUE;
379         }
380
381         @Override
382         public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
383           Files.delete(dir);
384           return FileVisitResult.CONTINUE;
385         }
386       });
387     }
388   }
389 }