2 * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.akka.segjournal;
10 import static org.junit.jupiter.api.Assertions.assertEquals;
11 import static org.junit.jupiter.api.Assertions.assertFalse;
12 import static org.junit.jupiter.api.Assertions.assertNull;
13 import static org.junit.jupiter.api.Assertions.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.times;
17 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.PoisonPill;
22 import akka.persistence.AtomicWrite;
23 import akka.persistence.PersistentRepr;
24 import akka.testkit.CallingThreadDispatcher;
25 import akka.testkit.javadsl.TestKit;
26 import io.atomix.storage.journal.StorageLevel;
28 import java.io.IOException;
29 import java.io.Serializable;
30 import java.nio.file.Files;
31 import java.nio.file.Path;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.function.Consumer;
36 import java.util.stream.Collectors;
37 import org.apache.commons.io.FileUtils;
38 import org.junit.jupiter.api.AfterAll;
39 import org.junit.jupiter.api.AfterEach;
40 import org.junit.jupiter.api.BeforeAll;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.junit.jupiter.api.extension.ExtendWith;
44 import org.mockito.Mock;
45 import org.mockito.junit.jupiter.MockitoExtension;
46 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
47 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
48 import scala.concurrent.Future;
50 @ExtendWith(MockitoExtension.class)
51 class SegmentedFileJournalTest {
52 private static final File DIRECTORY = new File("target/sfj-test");
53 private static final int SEGMENT_SIZE = 1024 * 1024;
54 private static final int MESSAGE_SIZE = 512 * 1024;
55 private static final int FLUSH_SIZE = 16 * 1024;
57 private static ActorSystem SYSTEM;
60 private Consumer<PersistentRepr> firstCallback;
63 private ActorRef actor;
66 static void beforeClass() {
67 SYSTEM = ActorSystem.create("test");
71 static void afterClass() {
72 TestKit.shutdownActorSystem(SYSTEM);
78 kit = new TestKit(SYSTEM);
79 FileUtils.deleteQuietly(DIRECTORY);
85 actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
86 FileUtils.deleteQuietly(DIRECTORY);
90 void testDeleteAfterStop() {
92 final WriteMessages write = new WriteMessages();
93 final Future<Optional<Exception>> first = write.add(AtomicWrite.apply(PersistentRepr.apply("first", 1, "foo",
94 null, false, kit.getRef(), "uuid")));
95 final Future<Optional<Exception>> second = write.add(AtomicWrite.apply(PersistentRepr.apply("second", 2, "foo",
96 null, false, kit.getRef(), "uuid")));
97 actor.tell(write, ActorRef.noSender());
98 assertFalse(getFuture(first).isPresent());
99 assertFalse(getFuture(second).isPresent());
101 assertHighestSequenceNr(2);
102 assertReplayCount(2);
106 assertHighestSequenceNr(2);
107 assertReplayCount(1);
110 actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
113 // Check if state is retained
114 assertHighestSequenceNr(2);
115 assertReplayCount(1);
119 void testSegmentation() throws IOException {
120 // We want to have roughly three segments
121 final LargePayload payload = new LargePayload();
123 final WriteMessages write = new WriteMessages();
124 final List<Future<Optional<Exception>>> requests = new ArrayList<>();
126 // Each payload is half of segment size, plus some overhead, should result in two segments being present
127 for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
128 requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
132 actor.tell(write, ActorRef.noSender());
133 requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
135 assertFileCount(2, 1);
137 // Delete all but the last entry
138 deleteEntries(requests.size());
140 assertFileCount(1, 1);
144 void testComplexDeletesAndPartialReplays() throws Exception {
145 for (int i = 0; i <= 4; i++) {
149 assertFileCount(10, 1);
151 // delete including index 3, so get rid of the first segment
153 assertFileCount(9, 1);
155 // get rid of segments 2(index 4-6) and 3(index 7-9)
157 assertFileCount(7, 1);
159 // get rid of all segments except the last one
161 assertFileCount(1, 1);
165 // Check if state is retained
166 assertHighestSequenceNr(30);
168 assertReplayCount(3);
174 assertHighestSequenceNr(30);
176 assertReplayCount(2);
182 assertReplayCount(1);
188 assertReplayCount(0);
191 private void restartActor() {
192 actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
196 private void writeBigPaylod() {
197 final LargePayload payload = new LargePayload();
199 final WriteMessages write = new WriteMessages();
200 final List<Future<Optional<Exception>>> requests = new ArrayList<>();
202 // Each payload is half of segment size, plus some overhead, should result in two segments being present
203 for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
204 requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
208 actor.tell(write, ActorRef.noSender());
209 requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
212 private ActorRef actor() {
213 return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
214 SEGMENT_SIZE, FLUSH_SIZE).withDispatcher(CallingThreadDispatcher.Id()));
217 private void deleteEntries(final long deleteTo) {
218 final AsyncMessage<Void> delete = SegmentedJournalActor.deleteMessagesTo(deleteTo);
219 actor.tell(delete, ActorRef.noSender());
220 assertNull(get(delete));
223 private void assertHighestSequenceNr(final long expected) {
224 AsyncMessage<Long> highest = SegmentedJournalActor.readHighestSequenceNr(0);
225 actor.tell(highest, ActorRef.noSender());
226 assertEquals(expected, (long) get(highest));
229 private void assertReplayCount(final int expected) {
230 // Cast fixes an Eclipse warning 'generic array created'
231 reset((Object) firstCallback);
232 AsyncMessage<Void> replay = SegmentedJournalActor.replayMessages(0, Long.MAX_VALUE, Long.MAX_VALUE,
234 actor.tell(replay, ActorRef.noSender());
235 assertNull(get(replay));
236 verify(firstCallback, times(expected)).accept(any(PersistentRepr.class));
239 private static void assertFileCount(final long dataFiles, final long deleteFiles) throws IOException {
240 List<File> contents = Files.list(DIRECTORY.toPath()).map(Path::toFile).collect(Collectors.toList());
241 assertEquals(dataFiles, contents.stream().filter(file -> file.getName().startsWith("data-")).count());
242 assertEquals(deleteFiles, contents.stream().filter(file -> file.getName().startsWith("delete-")).count());
245 private static <T> T get(final AsyncMessage<T> message) {
246 return getFuture(message.promise.future());
249 private static <T> T getFuture(final Future<T> future) {
250 assertTrue(future.isCompleted());
251 return future.value().get().get();
254 static final class LargePayload implements Serializable {
256 private static final long serialVersionUID = 1L;
258 final byte[] bytes = new byte[MESSAGE_SIZE / 2];