Add SegmentedFileJournal
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / test / java / org / opendaylight / controller / akka / segjournal / SegmentedFileJournalTest.java
diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/test/java/org/opendaylight/controller/akka/segjournal/SegmentedFileJournalTest.java
new file mode 100644 (file)
index 0000000..6939dbc
--- /dev/null
@@ -0,0 +1,183 @@
+/*
+ * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.persistence.AtomicWrite;
+import akka.persistence.PersistentRepr;
+import akka.testkit.CallingThreadDispatcher;
+import akka.testkit.javadsl.TestKit;
+import io.atomix.storage.StorageLevel;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.AsyncMessage;
+import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
+import scala.concurrent.Future;
+
+public class SegmentedFileJournalTest {
+    private static final File DIRECTORY = new File("target/sfj-test");
+    private static final int SEGMENT_SIZE = 1024 * 1024;
+    private static final int MESSAGE_SIZE = 512 * 1024;
+
+    private static ActorSystem SYSTEM;
+
+    private TestKit kit;
+    private ActorRef actor;
+
+    @BeforeClass
+    public static void beforeClass() {
+        SYSTEM = ActorSystem.create("test");
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        TestKit.shutdownActorSystem(SYSTEM);
+        SYSTEM = null;
+    }
+
+    @Before
+    public void before() {
+        kit = new TestKit(SYSTEM);
+        FileUtils.deleteQuietly(DIRECTORY);
+        actor = actor();
+    }
+
+    @After
+    public void after() {
+        actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    @Test
+    public void testDeleteAfterStop() {
+        // Preliminary setup
+        final WriteMessages write = new WriteMessages();
+        final Future<Optional<Exception>> first = write.add(AtomicWrite.apply(PersistentRepr.apply("first", 1, "foo",
+            null, false, kit.getRef(), "uuid")));
+        final Future<Optional<Exception>> second = write.add(AtomicWrite.apply(PersistentRepr.apply("second", 2, "foo",
+            null, false, kit.getRef(), "uuid")));
+        actor.tell(write, ActorRef.noSender());
+        assertFalse(getFuture(first).isPresent());
+        assertFalse(getFuture(second).isPresent());
+
+        assertHighestSequenceNr(2);
+        assertReplayCount(2);
+
+        deleteEntries(1);
+
+        assertHighestSequenceNr(2);
+        assertReplayCount(1);
+
+        // Restart actor
+        actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        actor = actor();
+
+        // Check if state is retained
+        assertHighestSequenceNr(2);
+        assertReplayCount(1);
+    }
+
+    @Test
+    public void testSegmentation() throws IOException {
+        // We want to have roughly three segments
+        final LargePayload payload = new LargePayload();
+
+        final WriteMessages write = new WriteMessages();
+        final List<Future<Optional<Exception>>> requests = new ArrayList<>();
+
+        // Each payload is half of segment size, plus some overhead, should result in two segments being present
+        for (int i = 1; i <= SEGMENT_SIZE * 3 / MESSAGE_SIZE; ++i) {
+            requests.add(write.add(AtomicWrite.apply(PersistentRepr.apply(payload, i, "foo", null, false, kit.getRef(),
+                "uuid"))));
+        }
+
+        actor.tell(write, ActorRef.noSender());
+        requests.forEach(future -> assertFalse(getFuture(future).isPresent()));
+
+        assertFileCount(2, 1);
+
+        // Delete all but the last entry
+        deleteEntries(requests.size());
+
+        assertFileCount(1, 1);
+    }
+
+    private ActorRef actor() {
+        return kit.childActorOf(SegmentedJournalActor.props("foo", DIRECTORY, StorageLevel.DISK, MESSAGE_SIZE,
+            SEGMENT_SIZE).withDispatcher(CallingThreadDispatcher.Id()));
+    }
+
+    private void deleteEntries(final long deleteTo) {
+        final AsyncMessage<Void> delete = SegmentedJournalActor.deleteMessagesTo(deleteTo);
+        actor.tell(delete, ActorRef.noSender());
+        assertNull(get(delete));
+    }
+
+    private void assertHighestSequenceNr(final long expected) {
+        AsyncMessage<Long> highest = SegmentedJournalActor.readHighestSequenceNr(0);
+        actor.tell(highest, ActorRef.noSender());
+        assertEquals(expected, (long) get(highest));
+    }
+
+    private void assertReplayCount(final int expected) {
+        Consumer<PersistentRepr> firstCallback = mock(Consumer.class);
+        doNothing().when(firstCallback).accept(any(PersistentRepr.class));
+        AsyncMessage<Void> replay = SegmentedJournalActor.replayMessages(0, Long.MAX_VALUE, Long.MAX_VALUE,
+            firstCallback);
+        actor.tell(replay, ActorRef.noSender());
+        assertNull(get(replay));
+        verify(firstCallback, times(expected)).accept(any(PersistentRepr.class));
+    }
+
+    private static void assertFileCount(final long dataFiles, final long deleteFiles) throws IOException {
+        List<File> contents = Files.list(DIRECTORY.toPath()).map(Path::toFile).collect(Collectors.toList());
+        assertEquals(dataFiles, contents.stream().filter(file -> file.getName().startsWith("data-")).count());
+        assertEquals(deleteFiles, contents.stream().filter(file -> file.getName().startsWith("delete-")).count());
+    }
+
+    private static <T> T get(final AsyncMessage<T> message) {
+        return getFuture(message.promise.future());
+    }
+
+    private static <T> T getFuture(final Future<T> future) {
+        assertTrue(future.isCompleted());
+        return future.value().get().get();
+    }
+
+    private static final class LargePayload implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        final byte[] bytes = new byte[MESSAGE_SIZE / 2];
+
+    }
+}