Take snapshot after recovery on migrated messages
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.base.Function;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.cluster.DataPersistenceProvider;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31 import org.opendaylight.yangtools.concepts.Identifier;
32
33 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
34     public static final short PAYLOAD_VERSION = 5;
35
36     final RaftActor actorDelegate;
37     final RaftActorRecoveryCohort recoveryCohortDelegate;
38     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
39     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
40     private final List<Object> state;
41     private final ActorRef roleChangeNotifier;
42     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
43     private RaftActorRecoverySupport raftActorRecoverySupport;
44     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
45     private final byte[] restoreFromSnapshot;
46     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
47     private final Function<Runnable, Void> pauseLeaderFunction;
48
49     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
50         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
51             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
52         state = new ArrayList<>();
53         this.actorDelegate = mock(RaftActor.class);
54         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
55
56         this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
57             mock(RaftActorSnapshotCohort.class);
58
59         if(builder.dataPersistenceProvider == null){
60             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
61         } else {
62             setPersistence(builder.dataPersistenceProvider);
63         }
64
65         roleChangeNotifier = builder.roleChangeNotifier;
66         snapshotMessageSupport = builder.snapshotMessageSupport;
67         restoreFromSnapshot = builder.restoreFromSnapshot;
68         pauseLeaderFunction = builder.pauseLeaderFunction;
69     }
70
71     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
72         raftActorRecoverySupport = support;
73     }
74
75     @Override
76     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
77         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
78     }
79
80     @Override
81     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
82         return snapshotMessageSupport != null ? snapshotMessageSupport :
83             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
84     }
85
86     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
87         return snapshotMessageSupport;
88     }
89
90     public void waitForRecoveryComplete() {
91         try {
92             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
93         } catch (InterruptedException e) {
94             e.printStackTrace();
95         }
96     }
97
98     public void waitForInitializeBehaviorComplete() {
99         try {
100             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
101         } catch (InterruptedException e) {
102             e.printStackTrace();
103         }
104     }
105
106
107     public void waitUntilLeader(){
108         for(int i = 0;i < 10; i++){
109             if(isLeader()){
110                 break;
111             }
112             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
113         }
114     }
115
116     public List<Object> getState() {
117         return state;
118     }
119
120     @Override
121     protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
122         actorDelegate.applyState(clientActor, identifier, data);
123         LOG.info("{}: applyState called: {}", persistenceId(), data);
124
125         state.add(data);
126     }
127
128     @Override
129     @Nonnull
130     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
131         return this;
132     }
133
134     @Override
135     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
136         return this;
137     }
138
139     @Override
140     public void startLogRecoveryBatch(int maxBatchSize) {
141     }
142
143     @Override
144     public void appendRecoveredLogEntry(Payload data) {
145         state.add(data);
146     }
147
148     @Override
149     public void applyCurrentLogRecoveryBatch() {
150     }
151
152     @Override
153     protected void onRecoveryComplete() {
154         actorDelegate.onRecoveryComplete();
155         recoveryComplete.countDown();
156     }
157
158     @Override
159     protected void initializeBehavior() {
160         super.initializeBehavior();
161         initializeBehaviorComplete.countDown();
162     }
163
164     @Override
165     public void applyRecoverySnapshot(byte[] bytes) {
166         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
167         applySnapshotBytes(bytes);
168     }
169
170     private void applySnapshotBytes(byte[] bytes) {
171         try {
172             Object data = toObject(bytes);
173             if (data instanceof List) {
174                 state.clear();
175                 state.addAll((List<?>) data);
176             }
177         } catch (Exception e) {
178             e.printStackTrace();
179         }
180     }
181
182     @Override
183     public void createSnapshot(ActorRef actorRef) {
184         LOG.info("{}: createSnapshot called", persistenceId());
185         snapshotCohortDelegate.createSnapshot(actorRef);
186     }
187
188     @Override
189     public void applySnapshot(byte [] snapshot) {
190         LOG.info("{}: applySnapshot called", persistenceId());
191         applySnapshotBytes(snapshot);
192         snapshotCohortDelegate.applySnapshot(snapshot);
193     }
194
195     @Override
196     protected void onStateChanged() {
197         actorDelegate.onStateChanged();
198     }
199
200     @Override
201     protected Optional<ActorRef> getRoleChangeNotifier() {
202         return Optional.fromNullable(roleChangeNotifier);
203     }
204
205     @Override public String persistenceId() {
206         return this.getId();
207     }
208
209     protected void newBehavior(RaftActorBehavior newBehavior) {
210         self().tell(newBehavior, ActorRef.noSender());
211     }
212
213     @Override
214     protected void handleCommand(final Object message) {
215         if(message instanceof RaftActorBehavior) {
216             super.changeCurrentBehavior((RaftActorBehavior)message);
217         } else {
218             super.handleCommand(message);
219
220             if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
221                 snapshotCommitted.countDown();
222             }
223         }
224     }
225
226     @Override
227     protected void pauseLeader(Runnable operation) {
228         if(pauseLeaderFunction != null) {
229             pauseLeaderFunction.apply(operation);
230         } else {
231             super.pauseLeader(operation);
232         }
233     }
234
235     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
236         Object obj = null;
237         ByteArrayInputStream bis = null;
238         ObjectInputStream ois = null;
239         try {
240             bis = new ByteArrayInputStream(bs);
241             ois = new ObjectInputStream(bis);
242             obj = ois.readObject();
243         } finally {
244             if (bis != null) {
245                 bis.close();
246             }
247             if (ois != null) {
248                 ois.close();
249             }
250         }
251         return obj;
252     }
253
254     public ReplicatedLog getReplicatedLog(){
255         return this.getRaftActorContext().getReplicatedLog();
256     }
257
258     @Override
259     public byte[] getRestoreFromSnapshot() {
260         return restoreFromSnapshot;
261     }
262
263     public static Props props(final String id, final Map<String, String> peerAddresses,
264             ConfigParams config){
265         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
266     }
267
268     public static Props props(final String id, final Map<String, String> peerAddresses,
269                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
270         return builder().id(id).peerAddresses(peerAddresses).config(config).
271                 dataPersistenceProvider(dataPersistenceProvider).props();
272     }
273
274     public static Builder builder() {
275         return new Builder();
276     }
277
278     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
279         private Map<String, String> peerAddresses = Collections.emptyMap();
280         private String id;
281         private ConfigParams config;
282         private DataPersistenceProvider dataPersistenceProvider;
283         private ActorRef roleChangeNotifier;
284         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
285         private byte[] restoreFromSnapshot;
286         private Optional<Boolean> persistent = Optional.absent();
287         private final Class<A> actorClass;
288         private Function<Runnable, Void> pauseLeaderFunction;
289         private RaftActorSnapshotCohort snapshotCohort;
290
291         protected AbstractBuilder(Class<A> actorClass) {
292             this.actorClass = actorClass;
293         }
294
295         @SuppressWarnings("unchecked")
296         private T self() {
297             return (T) this;
298         }
299
300         public T id(String id) {
301             this.id = id;
302             return self();
303         }
304
305         public T peerAddresses(Map<String, String> peerAddresses) {
306             this.peerAddresses = peerAddresses;
307             return self();
308         }
309
310         public T config(ConfigParams config) {
311             this.config = config;
312             return self();
313         }
314
315         public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
316             this.dataPersistenceProvider = dataPersistenceProvider;
317             return self();
318         }
319
320         public T roleChangeNotifier(ActorRef roleChangeNotifier) {
321             this.roleChangeNotifier = roleChangeNotifier;
322             return self();
323         }
324
325         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
326             this.snapshotMessageSupport = snapshotMessageSupport;
327             return self();
328         }
329
330         public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
331             this.restoreFromSnapshot = restoreFromSnapshot;
332             return self();
333         }
334
335         public T persistent(Optional<Boolean> persistent) {
336             this.persistent = persistent;
337             return self();
338         }
339
340         public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
341             this.pauseLeaderFunction = pauseLeaderFunction;
342             return self();
343         }
344
345         public T snapshotCohort(RaftActorSnapshotCohort snapshotCohort) {
346             this.snapshotCohort = snapshotCohort;
347             return self();
348         }
349
350         public Props props() {
351             return Props.create(actorClass, this);
352         }
353     }
354
355     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
356         private Builder() {
357             super(MockRaftActor.class);
358         }
359     }
360 }