Force install snapshot when follower log is ahead
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.base.Function;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.cluster.DataPersistenceProvider;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31 import org.opendaylight.yangtools.concepts.Identifier;
32
33 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
34     public static final short PAYLOAD_VERSION = 5;
35
36     final RaftActor actorDelegate;
37     final RaftActorRecoveryCohort recoveryCohortDelegate;
38     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
39     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
40     private final List<Object> state;
41     private final ActorRef roleChangeNotifier;
42     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
43     private RaftActorRecoverySupport raftActorRecoverySupport;
44     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
45     private final byte[] restoreFromSnapshot;
46     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
47     private final Function<Runnable, Void> pauseLeaderFunction;
48
49     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
50         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
51             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
52         state = new ArrayList<>();
53         this.actorDelegate = mock(RaftActor.class);
54         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
55         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
56
57         if(builder.dataPersistenceProvider == null){
58             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
59         } else {
60             setPersistence(builder.dataPersistenceProvider);
61         }
62
63         roleChangeNotifier = builder.roleChangeNotifier;
64         snapshotMessageSupport = builder.snapshotMessageSupport;
65         restoreFromSnapshot = builder.restoreFromSnapshot;
66         pauseLeaderFunction = builder.pauseLeaderFunction;
67     }
68
69     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
70         raftActorRecoverySupport = support;
71     }
72
73     @Override
74     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
75         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
76     }
77
78     @Override
79     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
80         return snapshotMessageSupport != null ? snapshotMessageSupport :
81             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
82     }
83
84     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
85         return snapshotMessageSupport;
86     }
87
88     public void waitForRecoveryComplete() {
89         try {
90             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
91         } catch (InterruptedException e) {
92             e.printStackTrace();
93         }
94     }
95
96     public void waitForInitializeBehaviorComplete() {
97         try {
98             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
99         } catch (InterruptedException e) {
100             e.printStackTrace();
101         }
102     }
103
104
105     public void waitUntilLeader(){
106         for(int i = 0;i < 10; i++){
107             if(isLeader()){
108                 break;
109             }
110             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
111         }
112     }
113
114     public List<Object> getState() {
115         return state;
116     }
117
118     @Override
119     protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
120         actorDelegate.applyState(clientActor, identifier, data);
121         LOG.info("{}: applyState called: {}", persistenceId(), data);
122
123         state.add(data);
124     }
125
126     @Override
127     @Nonnull
128     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
129         return this;
130     }
131
132     @Override
133     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
134         return this;
135     }
136
137     @Override
138     public void startLogRecoveryBatch(int maxBatchSize) {
139     }
140
141     @Override
142     public void appendRecoveredLogEntry(Payload data) {
143         state.add(data);
144     }
145
146     @Override
147     public void applyCurrentLogRecoveryBatch() {
148     }
149
150     @Override
151     protected void onRecoveryComplete() {
152         actorDelegate.onRecoveryComplete();
153         recoveryComplete.countDown();
154     }
155
156     @Override
157     protected void initializeBehavior() {
158         super.initializeBehavior();
159         initializeBehaviorComplete.countDown();
160     }
161
162     @Override
163     public void applyRecoverySnapshot(byte[] bytes) {
164         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
165         applySnapshotBytes(bytes);
166     }
167
168     private void applySnapshotBytes(byte[] bytes) {
169         try {
170             Object data = toObject(bytes);
171             if (data instanceof List) {
172                 state.clear();
173                 state.addAll((List<?>) data);
174             }
175         } catch (Exception e) {
176             e.printStackTrace();
177         }
178     }
179
180     @Override
181     public void createSnapshot(ActorRef actorRef) {
182         LOG.info("{}: createSnapshot called", persistenceId());
183         snapshotCohortDelegate.createSnapshot(actorRef);
184     }
185
186     @Override
187     public void applySnapshot(byte [] snapshot) {
188         LOG.info("{}: applySnapshot called", persistenceId());
189         applySnapshotBytes(snapshot);
190         snapshotCohortDelegate.applySnapshot(snapshot);
191     }
192
193     @Override
194     protected void onStateChanged() {
195         actorDelegate.onStateChanged();
196     }
197
198     @Override
199     protected Optional<ActorRef> getRoleChangeNotifier() {
200         return Optional.fromNullable(roleChangeNotifier);
201     }
202
203     @Override public String persistenceId() {
204         return this.getId();
205     }
206
207     protected void newBehavior(RaftActorBehavior newBehavior) {
208         self().tell(newBehavior, ActorRef.noSender());
209     }
210
211     @Override
212     protected void handleCommand(final Object message) {
213         if(message instanceof RaftActorBehavior) {
214             super.changeCurrentBehavior((RaftActorBehavior)message);
215         } else {
216             super.handleCommand(message);
217
218             if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
219                 snapshotCommitted.countDown();
220             }
221         }
222     }
223
224     @Override
225     protected void pauseLeader(Runnable operation) {
226         if(pauseLeaderFunction != null) {
227             pauseLeaderFunction.apply(operation);
228         } else {
229             super.pauseLeader(operation);
230         }
231     }
232
233     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
234         Object obj = null;
235         ByteArrayInputStream bis = null;
236         ObjectInputStream ois = null;
237         try {
238             bis = new ByteArrayInputStream(bs);
239             ois = new ObjectInputStream(bis);
240             obj = ois.readObject();
241         } finally {
242             if (bis != null) {
243                 bis.close();
244             }
245             if (ois != null) {
246                 ois.close();
247             }
248         }
249         return obj;
250     }
251
252     public ReplicatedLog getReplicatedLog(){
253         return this.getRaftActorContext().getReplicatedLog();
254     }
255
256     @Override
257     public byte[] getRestoreFromSnapshot() {
258         return restoreFromSnapshot;
259     }
260
261     public static Props props(final String id, final Map<String, String> peerAddresses,
262             ConfigParams config){
263         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
264     }
265
266     public static Props props(final String id, final Map<String, String> peerAddresses,
267                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
268         return builder().id(id).peerAddresses(peerAddresses).config(config).
269                 dataPersistenceProvider(dataPersistenceProvider).props();
270     }
271
272     public static Builder builder() {
273         return new Builder();
274     }
275
276     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
277         private Map<String, String> peerAddresses = Collections.emptyMap();
278         private String id;
279         private ConfigParams config;
280         private DataPersistenceProvider dataPersistenceProvider;
281         private ActorRef roleChangeNotifier;
282         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
283         private byte[] restoreFromSnapshot;
284         private Optional<Boolean> persistent = Optional.absent();
285         private final Class<A> actorClass;
286         private Function<Runnable, Void> pauseLeaderFunction;
287
288         protected AbstractBuilder(Class<A> actorClass) {
289             this.actorClass = actorClass;
290         }
291
292         @SuppressWarnings("unchecked")
293         private T self() {
294             return (T) this;
295         }
296
297         public T id(String id) {
298             this.id = id;
299             return self();
300         }
301
302         public T peerAddresses(Map<String, String> peerAddresses) {
303             this.peerAddresses = peerAddresses;
304             return self();
305         }
306
307         public T config(ConfigParams config) {
308             this.config = config;
309             return self();
310         }
311
312         public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
313             this.dataPersistenceProvider = dataPersistenceProvider;
314             return self();
315         }
316
317         public T roleChangeNotifier(ActorRef roleChangeNotifier) {
318             this.roleChangeNotifier = roleChangeNotifier;
319             return self();
320         }
321
322         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
323             this.snapshotMessageSupport = snapshotMessageSupport;
324             return self();
325         }
326
327         public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
328             this.restoreFromSnapshot = restoreFromSnapshot;
329             return self();
330         }
331
332         public T persistent(Optional<Boolean> persistent) {
333             this.persistent = persistent;
334             return self();
335         }
336
337         public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
338             this.pauseLeaderFunction = pauseLeaderFunction;
339             return self();
340         }
341
342         public Props props() {
343             return Props.create(actorClass, this);
344         }
345     }
346
347     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
348         private Builder() {
349             super(MockRaftActor.class);
350         }
351     }
352 }