Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.io.ByteSource;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.IOException;
19 import java.io.OutputStream;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Function;
29 import org.apache.commons.lang3.SerializationUtils;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
32 import org.opendaylight.controller.cluster.raft.messages.Payload;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
34 import org.opendaylight.yangtools.concepts.Identifier;
35
36 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
37     public static final short PAYLOAD_VERSION = 5;
38
39     final RaftActor actorDelegate;
40     final RaftActorRecoveryCohort recoveryCohortDelegate;
41     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
42     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
43     private final List<Object> state;
44     private final ActorRef roleChangeNotifier;
45     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
46     private RaftActorRecoverySupport raftActorRecoverySupport;
47     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
48     private final Snapshot restoreFromSnapshot;
49     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
50     private final Function<Runnable, Void> pauseLeaderFunction;
51
52     protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
53         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
54             Collections.emptyMap(), Optional.ofNullable(builder.config), PAYLOAD_VERSION);
55         state = Collections.synchronizedList(new ArrayList<>());
56         actorDelegate = mock(RaftActor.class);
57         recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
58
59         snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
60             mock(RaftActorSnapshotCohort.class);
61
62         if (builder.dataPersistenceProvider == null) {
63             setPersistence(builder.persistent.isPresent() ? builder.persistent.orElseThrow() : true);
64         } else {
65             setPersistence(builder.dataPersistenceProvider);
66         }
67
68         roleChangeNotifier = builder.roleChangeNotifier;
69         snapshotMessageSupport = builder.snapshotMessageSupport;
70         restoreFromSnapshot = builder.restoreFromSnapshot;
71         pauseLeaderFunction = builder.pauseLeaderFunction;
72     }
73
74     public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) {
75         raftActorRecoverySupport = support;
76     }
77
78     @Override
79     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
80         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
81     }
82
83     @Override
84     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
85         return snapshotMessageSupport != null ? snapshotMessageSupport :
86             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
87     }
88
89     @Override
90     public RaftActorContext getRaftActorContext() {
91         return super.getRaftActorContext();
92     }
93
94     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
95         return snapshotMessageSupport;
96     }
97
98     public void waitForRecoveryComplete() {
99         try {
100             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
101         } catch (InterruptedException e) {
102             throw new RuntimeException(e);
103         }
104     }
105
106     public void waitForInitializeBehaviorComplete() {
107         try {
108             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
109         } catch (InterruptedException e) {
110             throw new RuntimeException(e);
111         }
112     }
113
114
115     public void waitUntilLeader() {
116         for (int i = 0; i < 10; i++) {
117             if (isLeader()) {
118                 break;
119             }
120             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
121         }
122     }
123
124     public List<Object> getState() {
125         return state;
126     }
127
128     @Override
129     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
130         actorDelegate.applyState(clientActor, identifier, data);
131         LOG.info("{}: applyState called: {}", persistenceId(), data);
132
133         state.add(data);
134     }
135
136     @Override
137     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
138         return this;
139     }
140
141     @Override
142     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
143         return this;
144     }
145
146     @Override
147     public void startLogRecoveryBatch(final int maxBatchSize) {
148     }
149
150     @Override
151     public void appendRecoveredLogEntry(final Payload data) {
152         state.add(data);
153     }
154
155     @Override
156     public void applyCurrentLogRecoveryBatch() {
157     }
158
159     @Override
160     protected void onRecoveryComplete() {
161         actorDelegate.onRecoveryComplete();
162         recoveryComplete.countDown();
163     }
164
165     @Override
166     protected void initializeBehavior() {
167         super.initializeBehavior();
168         initializeBehaviorComplete.countDown();
169     }
170
171     @Override
172     public void applyRecoverySnapshot(final Snapshot.State newState) {
173         recoveryCohortDelegate.applyRecoverySnapshot(newState);
174         applySnapshotState(newState);
175     }
176
177     private void applySnapshotState(final Snapshot.State newState) {
178         if (newState instanceof MockSnapshotState mockState) {
179             state.clear();
180             state.addAll(mockState.getState());
181         }
182     }
183
184     @Override
185     public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
186         LOG.info("{}: createSnapshot called", persistenceId());
187         snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
188     }
189
190     @Override
191     public void applySnapshot(final Snapshot.State newState) {
192         LOG.info("{}: applySnapshot called", persistenceId());
193         applySnapshotState(newState);
194         snapshotCohortDelegate.applySnapshot(newState);
195     }
196
197     @Override
198     public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
199         try {
200             return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
201         } catch (IOException e) {
202             throw new RuntimeException("Error deserializing state", e);
203         }
204     }
205
206     @Override
207     protected void onStateChanged() {
208         actorDelegate.onStateChanged();
209     }
210
211     @Override
212     protected Optional<ActorRef> getRoleChangeNotifier() {
213         return Optional.ofNullable(roleChangeNotifier);
214     }
215
216     @Override public String persistenceId() {
217         return getId();
218     }
219
220     protected void newBehavior(final RaftActorBehavior newBehavior) {
221         self().tell(newBehavior, ActorRef.noSender());
222     }
223
224     @Override
225     protected void handleCommand(final Object message) {
226         if (message instanceof RaftActorBehavior) {
227             super.changeCurrentBehavior((RaftActorBehavior)message);
228         } else {
229             super.handleCommand(message);
230
231             if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
232                 snapshotCommitted.countDown();
233             }
234         }
235     }
236
237     @Override
238     protected void pauseLeader(final Runnable operation) {
239         if (pauseLeaderFunction != null) {
240             pauseLeaderFunction.apply(operation);
241         } else {
242             super.pauseLeader(operation);
243         }
244     }
245
246     public static List<Object> fromState(final Snapshot.State from) {
247         if (from instanceof MockSnapshotState mockState) {
248             return mockState.getState();
249         }
250
251         throw new IllegalStateException("Unexpected snapshot State: " + from);
252     }
253
254     public ReplicatedLog getReplicatedLog() {
255         return getRaftActorContext().getReplicatedLog();
256     }
257
258     @Override
259     public Snapshot getRestoreFromSnapshot() {
260         return restoreFromSnapshot;
261     }
262
263     public static Props props(final String id, final Map<String, String> peerAddresses, final ConfigParams config) {
264         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
265     }
266
267     public static Props props(final String id, final Map<String, String> peerAddresses,
268                               final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) {
269         return builder().id(id).peerAddresses(peerAddresses).config(config)
270                 .dataPersistenceProvider(dataPersistenceProvider).props();
271     }
272
273     public static Builder builder() {
274         return new Builder();
275     }
276
277     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
278         private Map<String, String> peerAddresses = Collections.emptyMap();
279         private String id;
280         private ConfigParams config;
281         private DataPersistenceProvider dataPersistenceProvider;
282         private ActorRef roleChangeNotifier;
283         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
284         private Snapshot restoreFromSnapshot;
285         private Optional<Boolean> persistent = Optional.empty();
286         private final Class<A> actorClass;
287         private Function<Runnable, Void> pauseLeaderFunction;
288         private RaftActorSnapshotCohort snapshotCohort;
289
290         protected AbstractBuilder(final Class<A> actorClass) {
291             this.actorClass = actorClass;
292         }
293
294         @SuppressWarnings("unchecked")
295         private T self() {
296             return (T) this;
297         }
298
299         public T id(final String newId) {
300             id = newId;
301             return self();
302         }
303
304         public T peerAddresses(final Map<String, String> newPeerAddresses) {
305             peerAddresses = newPeerAddresses;
306             return self();
307         }
308
309         public T config(final ConfigParams newConfig) {
310             config = newConfig;
311             return self();
312         }
313
314         public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) {
315             dataPersistenceProvider = newDataPersistenceProvider;
316             return self();
317         }
318
319         public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) {
320             roleChangeNotifier = newRoleChangeNotifier;
321             return self();
322         }
323
324         public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
325             snapshotMessageSupport = newSnapshotMessageSupport;
326             return self();
327         }
328
329         public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) {
330             restoreFromSnapshot = newRestoreFromSnapshot;
331             return self();
332         }
333
334         public T persistent(final Optional<Boolean> newPersistent) {
335             persistent = newPersistent;
336             return self();
337         }
338
339         public T pauseLeaderFunction(final Function<Runnable, Void> newPauseLeaderFunction) {
340             pauseLeaderFunction = newPauseLeaderFunction;
341             return self();
342         }
343
344         public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) {
345             snapshotCohort = newSnapshotCohort;
346             return self();
347         }
348
349         public Props props() {
350             return Props.create(actorClass, this);
351         }
352     }
353
354     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
355         Builder() {
356             super(MockRaftActor.class);
357         }
358     }
359
360     public static class MockSnapshotState implements Snapshot.State {
361         private static final long serialVersionUID = 1L;
362
363         private final List<Object> state;
364
365         public MockSnapshotState(final List<Object> state) {
366             this.state = state;
367         }
368
369         public List<Object> getState() {
370             return state;
371         }
372
373         @Override
374         public int hashCode() {
375             return Objects.hash(state);
376         }
377
378         @Override
379         public boolean equals(final Object obj) {
380             if (this == obj) {
381                 return true;
382             }
383             if (obj == null) {
384                 return false;
385             }
386             if (getClass() != obj.getClass()) {
387                 return false;
388             }
389             MockSnapshotState other = (MockSnapshotState) obj;
390             if (!Objects.equals(state, other.state)) {
391                 return false;
392             }
393             return true;
394         }
395
396         @Override
397         public String toString() {
398             return "MockSnapshotState [state=" + state + "]";
399         }
400     }
401 }