4b241f95768882d498765bfc6b35a367598785b6
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.io.ByteSource;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.IOException;
19 import java.io.OutputStream;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.Function;
28 import org.apache.commons.lang3.SerializationUtils;
29 import org.opendaylight.controller.cluster.DataPersistenceProvider;
30 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
31 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
33 import org.opendaylight.yangtools.concepts.Identifier;
34
35 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
36     public static final short PAYLOAD_VERSION = 5;
37
38     final RaftActor actorDelegate;
39     final RaftActorRecoveryCohort recoveryCohortDelegate;
40     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
41     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
42     private final List<Object> state;
43     private final ActorRef roleChangeNotifier;
44     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
45     private RaftActorRecoverySupport raftActorRecoverySupport;
46     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
47     private final Snapshot restoreFromSnapshot;
48     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
49     private final Function<Runnable, Void> pauseLeaderFunction;
50
51     protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
52         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
53             Collections.emptyMap(), Optional.ofNullable(builder.config), PAYLOAD_VERSION);
54         state = Collections.synchronizedList(new ArrayList<>());
55         this.actorDelegate = mock(RaftActor.class);
56         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
57
58         this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
59             mock(RaftActorSnapshotCohort.class);
60
61         if (builder.dataPersistenceProvider == null) {
62             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
63         } else {
64             setPersistence(builder.dataPersistenceProvider);
65         }
66
67         roleChangeNotifier = builder.roleChangeNotifier;
68         snapshotMessageSupport = builder.snapshotMessageSupport;
69         restoreFromSnapshot = builder.restoreFromSnapshot;
70         pauseLeaderFunction = builder.pauseLeaderFunction;
71     }
72
73     public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) {
74         raftActorRecoverySupport = support;
75     }
76
77     @Override
78     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
79         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
80     }
81
82     @Override
83     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
84         return snapshotMessageSupport != null ? snapshotMessageSupport :
85             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
86     }
87
88     @Override
89     public RaftActorContext getRaftActorContext() {
90         return super.getRaftActorContext();
91     }
92
93     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
94         return snapshotMessageSupport;
95     }
96
97     public void waitForRecoveryComplete() {
98         try {
99             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
100         } catch (InterruptedException e) {
101             throw new RuntimeException(e);
102         }
103     }
104
105     public void waitForInitializeBehaviorComplete() {
106         try {
107             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
108         } catch (InterruptedException e) {
109             throw new RuntimeException(e);
110         }
111     }
112
113
114     public void waitUntilLeader() {
115         for (int i = 0; i < 10; i++) {
116             if (isLeader()) {
117                 break;
118             }
119             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
120         }
121     }
122
123     public List<Object> getState() {
124         return state;
125     }
126
127     @Override
128     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
129         actorDelegate.applyState(clientActor, identifier, data);
130         LOG.info("{}: applyState called: {}", persistenceId(), data);
131
132         state.add(data);
133     }
134
135     @Override
136     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
137         return this;
138     }
139
140     @Override
141     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
142         return this;
143     }
144
145     @Override
146     public void startLogRecoveryBatch(final int maxBatchSize) {
147     }
148
149     @Override
150     public void appendRecoveredLogEntry(final Payload data) {
151         state.add(data);
152     }
153
154     @Override
155     public void applyCurrentLogRecoveryBatch() {
156     }
157
158     @Override
159     protected void onRecoveryComplete() {
160         actorDelegate.onRecoveryComplete();
161         recoveryComplete.countDown();
162     }
163
164     @Override
165     protected void initializeBehavior() {
166         super.initializeBehavior();
167         initializeBehaviorComplete.countDown();
168     }
169
170     @Override
171     public void applyRecoverySnapshot(final Snapshot.State newState) {
172         recoveryCohortDelegate.applyRecoverySnapshot(newState);
173         applySnapshotState(newState);
174     }
175
176     private void applySnapshotState(final Snapshot.State newState) {
177         if (newState instanceof MockSnapshotState) {
178             state.clear();
179             state.addAll(((MockSnapshotState)newState).getState());
180         }
181     }
182
183     @Override
184     public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
185         LOG.info("{}: createSnapshot called", persistenceId());
186         snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
187     }
188
189     @Override
190     public void applySnapshot(final Snapshot.State newState) {
191         LOG.info("{}: applySnapshot called", persistenceId());
192         applySnapshotState(newState);
193         snapshotCohortDelegate.applySnapshot(newState);
194     }
195
196     @Override
197     public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
198         try {
199             return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
200         } catch (IOException e) {
201             throw new RuntimeException("Error deserializing state", e);
202         }
203     }
204
205     @Override
206     protected void onStateChanged() {
207         actorDelegate.onStateChanged();
208     }
209
210     @Override
211     protected Optional<ActorRef> getRoleChangeNotifier() {
212         return Optional.ofNullable(roleChangeNotifier);
213     }
214
215     @Override public String persistenceId() {
216         return this.getId();
217     }
218
219     protected void newBehavior(final RaftActorBehavior newBehavior) {
220         self().tell(newBehavior, ActorRef.noSender());
221     }
222
223     @Override
224     protected void handleCommand(final Object message) {
225         if (message instanceof RaftActorBehavior) {
226             super.changeCurrentBehavior((RaftActorBehavior)message);
227         } else {
228             super.handleCommand(message);
229
230             if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
231                 snapshotCommitted.countDown();
232             }
233         }
234     }
235
236     @Override
237     protected void pauseLeader(final Runnable operation) {
238         if (pauseLeaderFunction != null) {
239             pauseLeaderFunction.apply(operation);
240         } else {
241             super.pauseLeader(operation);
242         }
243     }
244
245     public static List<Object> fromState(final Snapshot.State from) {
246         if (from instanceof MockSnapshotState) {
247             return ((MockSnapshotState)from).getState();
248         }
249
250         throw new IllegalStateException("Unexpected snapshot State: " + from);
251     }
252
253     public ReplicatedLog getReplicatedLog() {
254         return this.getRaftActorContext().getReplicatedLog();
255     }
256
257     @Override
258     public Snapshot getRestoreFromSnapshot() {
259         return restoreFromSnapshot;
260     }
261
262     public static Props props(final String id, final Map<String, String> peerAddresses, final ConfigParams config) {
263         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
264     }
265
266     public static Props props(final String id, final Map<String, String> peerAddresses,
267                               final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) {
268         return builder().id(id).peerAddresses(peerAddresses).config(config)
269                 .dataPersistenceProvider(dataPersistenceProvider).props();
270     }
271
272     public static Builder builder() {
273         return new Builder();
274     }
275
276     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
277         private Map<String, String> peerAddresses = Collections.emptyMap();
278         private String id;
279         private ConfigParams config;
280         private DataPersistenceProvider dataPersistenceProvider;
281         private ActorRef roleChangeNotifier;
282         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
283         private Snapshot restoreFromSnapshot;
284         private Optional<Boolean> persistent = Optional.empty();
285         private final Class<A> actorClass;
286         private Function<Runnable, Void> pauseLeaderFunction;
287         private RaftActorSnapshotCohort snapshotCohort;
288
289         protected AbstractBuilder(final Class<A> actorClass) {
290             this.actorClass = actorClass;
291         }
292
293         @SuppressWarnings("unchecked")
294         private T self() {
295             return (T) this;
296         }
297
298         public T id(final String newId) {
299             this.id = newId;
300             return self();
301         }
302
303         public T peerAddresses(final Map<String, String> newPeerAddresses) {
304             this.peerAddresses = newPeerAddresses;
305             return self();
306         }
307
308         public T config(final ConfigParams newConfig) {
309             this.config = newConfig;
310             return self();
311         }
312
313         public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) {
314             this.dataPersistenceProvider = newDataPersistenceProvider;
315             return self();
316         }
317
318         public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) {
319             this.roleChangeNotifier = newRoleChangeNotifier;
320             return self();
321         }
322
323         public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
324             this.snapshotMessageSupport = newSnapshotMessageSupport;
325             return self();
326         }
327
328         public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) {
329             this.restoreFromSnapshot = newRestoreFromSnapshot;
330             return self();
331         }
332
333         public T persistent(final Optional<Boolean> newPersistent) {
334             this.persistent = newPersistent;
335             return self();
336         }
337
338         public T pauseLeaderFunction(final Function<Runnable, Void> newPauseLeaderFunction) {
339             this.pauseLeaderFunction = newPauseLeaderFunction;
340             return self();
341         }
342
343         public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) {
344             this.snapshotCohort = newSnapshotCohort;
345             return self();
346         }
347
348         public Props props() {
349             return Props.create(actorClass, this);
350         }
351     }
352
353     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
354         Builder() {
355             super(MockRaftActor.class);
356         }
357     }
358
359     public static class MockSnapshotState implements Snapshot.State {
360         private static final long serialVersionUID = 1L;
361
362         private final List<Object> state;
363
364         public MockSnapshotState(final List<Object> state) {
365             this.state = state;
366         }
367
368         public List<Object> getState() {
369             return state;
370         }
371
372         @Override
373         public int hashCode() {
374             final int prime = 31;
375             int result = 1;
376             result = prime * result + (state == null ? 0 : state.hashCode());
377             return result;
378         }
379
380         @Override
381         public boolean equals(final Object obj) {
382             if (this == obj) {
383                 return true;
384             }
385             if (obj == null) {
386                 return false;
387             }
388             if (getClass() != obj.getClass()) {
389                 return false;
390             }
391             MockSnapshotState other = (MockSnapshotState) obj;
392             if (state == null) {
393                 if (other.state != null) {
394                     return false;
395                 }
396             } else if (!state.equals(other.state)) {
397                 return false;
398             }
399             return true;
400         }
401
402         @Override
403         public String toString() {
404             return "MockSnapshotState [state=" + state + "]";
405         }
406     }
407 }