20adaf12beff56f0125441c265d75c2533b3c769
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.base.Function;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Throwables;
19 import com.google.common.io.ByteSource;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.apache.commons.lang3.SerializationUtils;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
34 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
35 import org.opendaylight.yangtools.concepts.Identifier;
36
37 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
38     public static final short PAYLOAD_VERSION = 5;
39
40     final RaftActor actorDelegate;
41     final RaftActorRecoveryCohort recoveryCohortDelegate;
42     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
43     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
44     private final List<Object> state;
45     private final ActorRef roleChangeNotifier;
46     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
47     private RaftActorRecoverySupport raftActorRecoverySupport;
48     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
49     private final byte[] restoreFromSnapshot;
50     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
51     private final Function<Runnable, Void> pauseLeaderFunction;
52
53     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
54         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
55             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
56         state = new ArrayList<>();
57         this.actorDelegate = mock(RaftActor.class);
58         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
59
60         this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
61             mock(RaftActorSnapshotCohort.class);
62
63         if (builder.dataPersistenceProvider == null) {
64             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
65         } else {
66             setPersistence(builder.dataPersistenceProvider);
67         }
68
69         roleChangeNotifier = builder.roleChangeNotifier;
70         snapshotMessageSupport = builder.snapshotMessageSupport;
71         restoreFromSnapshot = builder.restoreFromSnapshot;
72         pauseLeaderFunction = builder.pauseLeaderFunction;
73     }
74
75     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
76         raftActorRecoverySupport = support;
77     }
78
79     @Override
80     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
81         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
82     }
83
84     @Override
85     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
86         return snapshotMessageSupport != null ? snapshotMessageSupport :
87             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
88     }
89
90     @Override
91     public RaftActorContext getRaftActorContext() {
92         return super.getRaftActorContext();
93     }
94
95     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
96         return snapshotMessageSupport;
97     }
98
99     public void waitForRecoveryComplete() {
100         try {
101             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
102         } catch (InterruptedException e) {
103             Throwables.propagate(e);
104         }
105     }
106
107     public void waitForInitializeBehaviorComplete() {
108         try {
109             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
110         } catch (InterruptedException e) {
111             Throwables.propagate(e);
112         }
113     }
114
115
116     public void waitUntilLeader() {
117         for (int i = 0; i < 10; i++) {
118             if (isLeader()) {
119                 break;
120             }
121             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
122         }
123     }
124
125     public List<Object> getState() {
126         return state;
127     }
128
129     @Override
130     protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
131         actorDelegate.applyState(clientActor, identifier, data);
132         LOG.info("{}: applyState called: {}", persistenceId(), data);
133
134         state.add(data);
135     }
136
137     @Override
138     @Nonnull
139     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
140         return this;
141     }
142
143     @Override
144     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
145         return this;
146     }
147
148     @Override
149     public void startLogRecoveryBatch(int maxBatchSize) {
150     }
151
152     @Override
153     public void appendRecoveredLogEntry(Payload data) {
154         state.add(data);
155     }
156
157     @Override
158     public void applyCurrentLogRecoveryBatch() {
159     }
160
161     @Override
162     protected void onRecoveryComplete() {
163         actorDelegate.onRecoveryComplete();
164         recoveryComplete.countDown();
165     }
166
167     @Override
168     protected void initializeBehavior() {
169         super.initializeBehavior();
170         initializeBehaviorComplete.countDown();
171     }
172
173     @Override
174     public void applyRecoverySnapshot(Snapshot.State newState) {
175         recoveryCohortDelegate.applyRecoverySnapshot(newState);
176         applySnapshotState(newState);
177     }
178
179     private void applySnapshotState(Snapshot.State newState) {
180         if (newState instanceof MockSnapshotState) {
181             state.clear();
182             state.addAll(((MockSnapshotState)newState).getState());
183         }
184     }
185
186     @Override
187     public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
188         LOG.info("{}: createSnapshot called", persistenceId());
189         snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
190     }
191
192     @Override
193     public void applySnapshot(Snapshot.State newState) {
194         LOG.info("{}: applySnapshot called", persistenceId());
195         applySnapshotState(newState);
196         snapshotCohortDelegate.applySnapshot(newState);
197     }
198
199     @Override
200     public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
201         try {
202             return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
203         } catch (IOException e) {
204             throw new RuntimeException("Error deserializing state", e);
205         }
206     }
207
208     @Override
209     public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
210         return new MockSnapshotState(SerializationUtils.deserialize(from));
211     }
212
213     @Override
214     protected void onStateChanged() {
215         actorDelegate.onStateChanged();
216     }
217
218     @Override
219     protected Optional<ActorRef> getRoleChangeNotifier() {
220         return Optional.fromNullable(roleChangeNotifier);
221     }
222
223     @Override public String persistenceId() {
224         return this.getId();
225     }
226
227     protected void newBehavior(RaftActorBehavior newBehavior) {
228         self().tell(newBehavior, ActorRef.noSender());
229     }
230
231     @Override
232     protected void handleCommand(final Object message) {
233         if (message instanceof RaftActorBehavior) {
234             super.changeCurrentBehavior((RaftActorBehavior)message);
235         } else {
236             super.handleCommand(message);
237
238             if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
239                 snapshotCommitted.countDown();
240             }
241         }
242     }
243
244     @Override
245     protected void pauseLeader(Runnable operation) {
246         if (pauseLeaderFunction != null) {
247             pauseLeaderFunction.apply(operation);
248         } else {
249             super.pauseLeader(operation);
250         }
251     }
252
253     public static List<Object> fromState(Snapshot.State from) {
254         if (from instanceof MockSnapshotState) {
255             return ((MockSnapshotState)from).getState();
256         }
257
258         throw new IllegalStateException("Unexpected snapshot State: " + from);
259     }
260
261     public ReplicatedLog getReplicatedLog() {
262         return this.getRaftActorContext().getReplicatedLog();
263     }
264
265     @Override
266     public byte[] getRestoreFromSnapshot() {
267         return restoreFromSnapshot;
268     }
269
270     public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
271         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
272     }
273
274     public static Props props(final String id, final Map<String, String> peerAddresses,
275                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
276         return builder().id(id).peerAddresses(peerAddresses).config(config)
277                 .dataPersistenceProvider(dataPersistenceProvider).props();
278     }
279
280     public static Builder builder() {
281         return new Builder();
282     }
283
284     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
285         private Map<String, String> peerAddresses = Collections.emptyMap();
286         private String id;
287         private ConfigParams config;
288         private DataPersistenceProvider dataPersistenceProvider;
289         private ActorRef roleChangeNotifier;
290         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
291         private byte[] restoreFromSnapshot;
292         private Optional<Boolean> persistent = Optional.absent();
293         private final Class<A> actorClass;
294         private Function<Runnable, Void> pauseLeaderFunction;
295         private RaftActorSnapshotCohort snapshotCohort;
296
297         protected AbstractBuilder(Class<A> actorClass) {
298             this.actorClass = actorClass;
299         }
300
301         @SuppressWarnings("unchecked")
302         private T self() {
303             return (T) this;
304         }
305
306         public T id(String newId) {
307             this.id = newId;
308             return self();
309         }
310
311         public T peerAddresses(Map<String, String> newPeerAddresses) {
312             this.peerAddresses = newPeerAddresses;
313             return self();
314         }
315
316         public T config(ConfigParams newConfig) {
317             this.config = newConfig;
318             return self();
319         }
320
321         public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
322             this.dataPersistenceProvider = newDataPersistenceProvider;
323             return self();
324         }
325
326         public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
327             this.roleChangeNotifier = newRoleChangeNotifier;
328             return self();
329         }
330
331         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
332             this.snapshotMessageSupport = newSnapshotMessageSupport;
333             return self();
334         }
335
336         public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
337             this.restoreFromSnapshot = newRestoreFromSnapshot;
338             return self();
339         }
340
341         public T persistent(Optional<Boolean> newPersistent) {
342             this.persistent = newPersistent;
343             return self();
344         }
345
346         public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
347             this.pauseLeaderFunction = newPauseLeaderFunction;
348             return self();
349         }
350
351         public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
352             this.snapshotCohort = newSnapshotCohort;
353             return self();
354         }
355
356         public Props props() {
357             return Props.create(actorClass, this);
358         }
359     }
360
361     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
362         private Builder() {
363             super(MockRaftActor.class);
364         }
365     }
366
367     public static class MockSnapshotState implements Snapshot.State {
368         private static final long serialVersionUID = 1L;
369
370         private final List<Object> state;
371
372         public MockSnapshotState(List<Object> state) {
373             this.state = state;
374         }
375
376         public List<Object> getState() {
377             return state;
378         }
379
380         @Override
381         public int hashCode() {
382             final int prime = 31;
383             int result = 1;
384             result = prime * result + (state == null ? 0 : state.hashCode());
385             return result;
386         }
387
388         @Override
389         public boolean equals(Object obj) {
390             if (this == obj) {
391                 return true;
392             }
393             if (obj == null) {
394                 return false;
395             }
396             if (getClass() != obj.getClass()) {
397                 return false;
398             }
399             MockSnapshotState other = (MockSnapshotState) obj;
400             if (state == null) {
401                 if (other.state != null) {
402                     return false;
403                 }
404             } else if (!state.equals(other.state)) {
405                 return false;
406             }
407             return true;
408         }
409
410         @Override
411         public String toString() {
412             return "MockSnapshotState [state=" + state + "]";
413         }
414     }
415 }