c96ab9a245d5d0cb40cb2bbfb4fd83bbab1c744b
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.base.Function;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.cluster.DataPersistenceProvider;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31
32 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
33     public static final short PAYLOAD_VERSION = 5;
34
35     final RaftActor actorDelegate;
36     final RaftActorRecoveryCohort recoveryCohortDelegate;
37     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
38     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
39     private final List<Object> state;
40     private final ActorRef roleChangeNotifier;
41     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
42     private RaftActorRecoverySupport raftActorRecoverySupport;
43     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
44     private final byte[] restoreFromSnapshot;
45     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
46     private final Function<Runnable, Void> pauseLeaderFunction;
47
48     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
49         super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
50         state = new ArrayList<>();
51         this.actorDelegate = mock(RaftActor.class);
52         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
53         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
54
55         if(builder.dataPersistenceProvider == null){
56             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
57         } else {
58             setPersistence(builder.dataPersistenceProvider);
59         }
60
61         roleChangeNotifier = builder.roleChangeNotifier;
62         snapshotMessageSupport = builder.snapshotMessageSupport;
63         restoreFromSnapshot = builder.restoreFromSnapshot;
64         pauseLeaderFunction = builder.pauseLeaderFunction;
65     }
66
67     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
68         raftActorRecoverySupport = support;
69     }
70
71     @Override
72     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
73         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
74     }
75
76     @Override
77     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
78         return snapshotMessageSupport != null ? snapshotMessageSupport :
79             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
80     }
81
82     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
83         return snapshotMessageSupport;
84     }
85
86     public void waitForRecoveryComplete() {
87         try {
88             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
89         } catch (InterruptedException e) {
90             e.printStackTrace();
91         }
92     }
93
94     public void waitForInitializeBehaviorComplete() {
95         try {
96             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
97         } catch (InterruptedException e) {
98             e.printStackTrace();
99         }
100     }
101
102
103     public void waitUntilLeader(){
104         for(int i = 0;i < 10; i++){
105             if(isLeader()){
106                 break;
107             }
108             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
109         }
110     }
111
112     public List<Object> getState() {
113         return state;
114     }
115
116
117     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
118         actorDelegate.applyState(clientActor, identifier, data);
119         LOG.info("{}: applyState called: {}", persistenceId(), data);
120
121         state.add(data);
122     }
123
124     @Override
125     @Nonnull
126     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
127         return this;
128     }
129
130     @Override
131     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
132         return this;
133     }
134
135     @Override
136     public void startLogRecoveryBatch(int maxBatchSize) {
137     }
138
139     @Override
140     public void appendRecoveredLogEntry(Payload data) {
141         state.add(data);
142     }
143
144     @Override
145     public void applyCurrentLogRecoveryBatch() {
146     }
147
148     @Override
149     protected void onRecoveryComplete() {
150         actorDelegate.onRecoveryComplete();
151         recoveryComplete.countDown();
152     }
153
154     @Override
155     protected void initializeBehavior() {
156         super.initializeBehavior();
157         initializeBehaviorComplete.countDown();
158     }
159
160     @Override
161     public void applyRecoverySnapshot(byte[] bytes) {
162         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
163         applySnapshotBytes(bytes);
164     }
165
166     private void applySnapshotBytes(byte[] bytes) {
167         try {
168             Object data = toObject(bytes);
169             if (data instanceof List) {
170                 state.addAll((List<?>) data);
171             }
172         } catch (Exception e) {
173             e.printStackTrace();
174         }
175     }
176
177     @Override
178     public void createSnapshot(ActorRef actorRef) {
179         LOG.info("{}: createSnapshot called", persistenceId());
180         snapshotCohortDelegate.createSnapshot(actorRef);
181     }
182
183     @Override
184     public void applySnapshot(byte [] snapshot) {
185         LOG.info("{}: applySnapshot called", persistenceId());
186         applySnapshotBytes(snapshot);
187         snapshotCohortDelegate.applySnapshot(snapshot);
188     }
189
190     @Override
191     protected void onStateChanged() {
192         actorDelegate.onStateChanged();
193     }
194
195     @Override
196     protected Optional<ActorRef> getRoleChangeNotifier() {
197         return Optional.fromNullable(roleChangeNotifier);
198     }
199
200     @Override public String persistenceId() {
201         return this.getId();
202     }
203
204     protected void newBehavior(RaftActorBehavior newBehavior) {
205         self().tell(newBehavior, ActorRef.noSender());
206     }
207
208     @Override
209     protected void handleCommand(final Object message) {
210         if(message instanceof RaftActorBehavior) {
211             super.changeCurrentBehavior((RaftActorBehavior)message);
212         } else {
213             super.handleCommand(message);
214
215             if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
216                 snapshotCommitted.countDown();
217             }
218         }
219     }
220
221     @Override
222     protected void pauseLeader(Runnable operation) {
223         if(pauseLeaderFunction != null) {
224             pauseLeaderFunction.apply(operation);
225         } else {
226             super.pauseLeader(operation);
227         }
228     }
229
230     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
231         Object obj = null;
232         ByteArrayInputStream bis = null;
233         ObjectInputStream ois = null;
234         try {
235             bis = new ByteArrayInputStream(bs);
236             ois = new ObjectInputStream(bis);
237             obj = ois.readObject();
238         } finally {
239             if (bis != null) {
240                 bis.close();
241             }
242             if (ois != null) {
243                 ois.close();
244             }
245         }
246         return obj;
247     }
248
249     public ReplicatedLog getReplicatedLog(){
250         return this.getRaftActorContext().getReplicatedLog();
251     }
252
253     @Override
254     public byte[] getRestoreFromSnapshot() {
255         return restoreFromSnapshot;
256     }
257
258     public static Props props(final String id, final Map<String, String> peerAddresses,
259             ConfigParams config){
260         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
261     }
262
263     public static Props props(final String id, final Map<String, String> peerAddresses,
264                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
265         return builder().id(id).peerAddresses(peerAddresses).config(config).
266                 dataPersistenceProvider(dataPersistenceProvider).props();
267     }
268
269     public static Builder builder() {
270         return new Builder();
271     }
272
273     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
274         private Map<String, String> peerAddresses = Collections.emptyMap();
275         private String id;
276         private ConfigParams config;
277         private DataPersistenceProvider dataPersistenceProvider;
278         private ActorRef roleChangeNotifier;
279         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
280         private byte[] restoreFromSnapshot;
281         private Optional<Boolean> persistent = Optional.absent();
282         private final Class<A> actorClass;
283         private Function<Runnable, Void> pauseLeaderFunction;
284
285         protected AbstractBuilder(Class<A> actorClass) {
286             this.actorClass = actorClass;
287         }
288
289         @SuppressWarnings("unchecked")
290         private T self() {
291             return (T) this;
292         }
293
294         public T id(String id) {
295             this.id = id;
296             return self();
297         }
298
299         public T peerAddresses(Map<String, String> peerAddresses) {
300             this.peerAddresses = peerAddresses;
301             return self();
302         }
303
304         public T config(ConfigParams config) {
305             this.config = config;
306             return self();
307         }
308
309         public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
310             this.dataPersistenceProvider = dataPersistenceProvider;
311             return self();
312         }
313
314         public T roleChangeNotifier(ActorRef roleChangeNotifier) {
315             this.roleChangeNotifier = roleChangeNotifier;
316             return self();
317         }
318
319         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
320             this.snapshotMessageSupport = snapshotMessageSupport;
321             return self();
322         }
323
324         public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
325             this.restoreFromSnapshot = restoreFromSnapshot;
326             return self();
327         }
328
329         public T persistent(Optional<Boolean> persistent) {
330             this.persistent = persistent;
331             return self();
332         }
333
334         public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
335             this.pauseLeaderFunction = pauseLeaderFunction;
336             return self();
337         }
338
339         public Props props() {
340             return Props.create(actorClass, this);
341         }
342     }
343
344     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
345         private Builder() {
346             super(MockRaftActor.class);
347         }
348     }
349 }