Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / test / java / org / opendaylight / controller / akka / segjournal / PerformanceTest.java
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.akka.segjournal;
9
10 import static org.junit.jupiter.api.Assertions.assertTrue;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.persistence.AtomicWrite;
16 import akka.persistence.PersistentRepr;
17 import akka.testkit.CallingThreadDispatcher;
18 import akka.testkit.javadsl.TestKit;
19 import com.codahale.metrics.Histogram;
20 import com.codahale.metrics.UniformReservoir;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.base.Ticker;
23 import io.atomix.storage.journal.StorageLevel;
24 import java.io.File;
25 import java.io.Serializable;
26 import java.util.List;
27 import java.util.Optional;
28 import java.util.concurrent.ThreadLocalRandom;
29 import org.apache.commons.io.FileUtils;
30 import org.junit.jupiter.api.AfterAll;
31 import org.junit.jupiter.api.AfterEach;
32 import org.junit.jupiter.api.BeforeAll;
33 import org.junit.jupiter.api.BeforeEach;
34 import org.junit.jupiter.api.Disabled;
35 import org.junit.jupiter.params.ParameterizedTest;
36 import org.junit.jupiter.params.provider.Arguments;
37 import org.junit.jupiter.params.provider.MethodSource;
38 import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import scala.concurrent.Future;
42
43 class PerformanceTest {
44     private static final class Payload implements Serializable {
45         @java.io.Serial
46         private static final long serialVersionUID = 1L;
47
48         final byte[] bytes;
49
50         Payload(final int size, final ThreadLocalRandom random) {
51             bytes = new byte[size];
52             random.nextBytes(bytes);
53         }
54     }
55
56     private static final class Request {
57         final WriteMessages write = new WriteMessages();
58         final Future<Optional<Exception>> future;
59
60         Request(final AtomicWrite atomicWrite) {
61             future = write.add(atomicWrite);
62         }
63     }
64
65     private static final Logger LOG = LoggerFactory.getLogger(PerformanceTest.class);
66     private static final File DIRECTORY = new File("target/sfj-perf");
67
68     private static ActorSystem SYSTEM;
69
70     private TestKit kit;
71     private ActorRef actor;
72
73     @BeforeAll
74     static void beforeClass() {
75         SYSTEM = ActorSystem.create("test");
76     }
77
78     @AfterAll
79     static void afterClass() {
80         TestKit.shutdownActorSystem(SYSTEM);
81         SYSTEM = null;
82     }
83
84     @BeforeEach
85     void before() {
86         kit = new TestKit(SYSTEM);
87         FileUtils.deleteQuietly(DIRECTORY);
88     }
89
90     @AfterEach
91     void after() {
92         if (actor != null) {
93             actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
94         }
95         FileUtils.deleteQuietly(DIRECTORY);
96     }
97
98     @Disabled("Disable due to being an extensive time hog")
99     @ParameterizedTest
100     @MethodSource
101     void writeRequests(final StorageLevel storage, final int maxEntrySize, final int maxSegmentSize,
102             final int payloadSize, final int requestCount) {
103         LOG.info("Test {} entrySize={} segmentSize={} payload={} count={}", storage, maxEntrySize, maxSegmentSize,
104             payloadSize, requestCount);
105
106         actor = kit.childActorOf(
107             SegmentedJournalActor.props("perf", DIRECTORY, storage, maxEntrySize, maxSegmentSize, maxEntrySize)
108             .withDispatcher(CallingThreadDispatcher.Id()));
109
110         final var random = ThreadLocalRandom.current();
111         final var sw = Stopwatch.createStarted();
112         final var payloads = new Payload[1_000];
113         for (int i = 0; i < payloads.length; ++i) {
114             payloads[i] = new Payload(payloadSize, random);
115         }
116         LOG.info("{} payloads created in {}", payloads.length, sw.stop());
117
118         sw.reset().start();
119         final var requests = new Request[requestCount];
120         for (int i = 0; i < requests.length; ++i) {
121             requests[i] = new Request(AtomicWrite.apply(PersistentRepr.apply(payloads[random.nextInt(payloads.length)],
122                 i, "foo", null, false, kit.getRef(), "uuid")));
123         }
124         LOG.info("{} requests created in {}", requests.length, sw.stop());
125
126         final var histogram = new Histogram(new UniformReservoir(requests.length));
127         sw.reset().start();
128         long started = System.nanoTime();
129         for (var req : requests) {
130             actor.tell(req.write, ActorRef.noSender());
131             assertTrue(req.future.isCompleted());
132             assertTrue(req.future.value().get().get().isEmpty());
133
134             final long now = System.nanoTime();
135             histogram.update(now - started);
136             started = now;
137         }
138         sw.stop();
139         final var snap = histogram.getSnapshot();
140
141         LOG.info("{} requests completed in {}", requests.length, sw);
142         LOG.info("Minimum: {}", formatNanos(snap.getMin()));
143         LOG.info("Maximum: {}", formatNanos(snap.getMax()));
144         LOG.info("Mean:    {}", formatNanos(snap.getMean()));
145         LOG.info("StdDev:  {}", formatNanos(snap.getStdDev()));
146         LOG.info("Median:  {}", formatNanos(snap.getMedian()));
147         LOG.info("75th:    {}", formatNanos(snap.get75thPercentile()));
148         LOG.info("95th:    {}", formatNanos(snap.get95thPercentile()));
149         LOG.info("98th:    {}", formatNanos(snap.get98thPercentile()));
150         LOG.info("99th:    {}", formatNanos(snap.get99thPercentile()));
151         LOG.info("99.9th:  {}", formatNanos(snap.get999thPercentile()));
152     }
153
154     private static List<Arguments> writeRequests() {
155         return List.of(
156             // DISK:
157             // 100K requests, 10K each, 16M max, 128M segment
158             Arguments.of(StorageLevel.DISK, 16 * 1024 * 1024, 128 * 1024 * 1024,    10_000,  100_000),
159             // 100K requests, 10K each, 1M max, 16M segment
160             Arguments.of(StorageLevel.DISK,      1024 * 1024,  16 * 1024 * 1024,    10_000,  100_000),
161             // 10K requests, 100K each, 1M max, 16M segment
162             Arguments.of(StorageLevel.DISK,      1024 * 1024,  16 * 1024 * 1024,   100_000,   10_000),
163             // 1K requests, 1M each, 1M max, 16M segment
164             Arguments.of(StorageLevel.DISK,      1024 * 1024,  16 * 1024 * 1024, 1_000_000,    1_000),
165
166             // MAPPED:
167             // 100K requests, 10K each, 16M max, 128M segment
168             Arguments.of(StorageLevel.MAPPED, 16 * 1024 * 1024, 128 * 1024 * 1024,    10_000,  100_000),
169             // 100K requests, 10K each, 1M max, 16M segment
170             Arguments.of(StorageLevel.MAPPED,      1024 * 1024,  16 * 1024 * 1024,    10_000,  100_000),
171             // 10K requests, 100K each, 1M max, 16M segment
172             Arguments.of(StorageLevel.MAPPED,      1024 * 1024,  16 * 1024 * 1024,   100_000,   10_000),
173             // 1K requests, 1M each, 1M max, 16M segment
174             Arguments.of(StorageLevel.MAPPED,      1024 * 1024,  16 * 1024 * 1024, 1_000_000,    1_000));
175
176     }
177
178     private static String formatNanos(final double nanos) {
179         return formatNanos(Math.round(nanos));
180     }
181
182     private static String formatNanos(final long nanos) {
183         return Stopwatch.createStarted(new Ticker() {
184             boolean started;
185
186             @Override
187             public long read() {
188                 if (started) {
189                     return nanos;
190                 }
191                 started = true;
192                 return 0;
193             }
194         }).toString();
195     }
196 }