Add ShutDown message to RaftActor to transfer leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.Uninterruptibles;
17 import java.io.ByteArrayInputStream;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
29 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
30
31 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
32
33     public static final short PAYLOAD_VERSION = 5;
34
35     final RaftActor actorDelegate;
36     final RaftActorRecoveryCohort recoveryCohortDelegate;
37     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
38     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
39     private final List<Object> state;
40     private final ActorRef roleChangeNotifier;
41     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
42     private RaftActorRecoverySupport raftActorRecoverySupport;
43     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
44     private final byte[] restoreFromSnapshot;
45     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
46
47     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
48         super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
49         state = new ArrayList<>();
50         this.actorDelegate = mock(RaftActor.class);
51         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
52         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
53
54         if(builder.dataPersistenceProvider == null){
55             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
56         } else {
57             setPersistence(builder.dataPersistenceProvider);
58         }
59
60         roleChangeNotifier = builder.roleChangeNotifier;
61         snapshotMessageSupport = builder.snapshotMessageSupport;
62         restoreFromSnapshot = builder.restoreFromSnapshot;
63     }
64
65     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
66         raftActorRecoverySupport = support;
67     }
68
69     @Override
70     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
71         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
72     }
73
74     @Override
75     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
76         return snapshotMessageSupport != null ? snapshotMessageSupport :
77             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
78     }
79
80     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
81         return snapshotMessageSupport;
82     }
83
84     public void waitForRecoveryComplete() {
85         try {
86             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
87         } catch (InterruptedException e) {
88             e.printStackTrace();
89         }
90     }
91
92     public void waitForInitializeBehaviorComplete() {
93         try {
94             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
95         } catch (InterruptedException e) {
96             e.printStackTrace();
97         }
98     }
99
100
101     public void waitUntilLeader(){
102         for(int i = 0;i < 10; i++){
103             if(isLeader()){
104                 break;
105             }
106             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
107         }
108     }
109
110     public List<Object> getState() {
111         return state;
112     }
113
114
115     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
116         actorDelegate.applyState(clientActor, identifier, data);
117         LOG.info("{}: applyState called: {}", persistenceId(), data);
118
119         state.add(data);
120     }
121
122     @Override
123     @Nonnull
124     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
125         return this;
126     }
127
128     @Override
129     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
130         return this;
131     }
132
133     @Override
134     public void startLogRecoveryBatch(int maxBatchSize) {
135     }
136
137     @Override
138     public void appendRecoveredLogEntry(Payload data) {
139         state.add(data);
140     }
141
142     @Override
143     public void applyCurrentLogRecoveryBatch() {
144     }
145
146     @Override
147     protected void onRecoveryComplete() {
148         actorDelegate.onRecoveryComplete();
149         recoveryComplete.countDown();
150     }
151
152     @Override
153     protected void initializeBehavior() {
154         super.initializeBehavior();
155         initializeBehaviorComplete.countDown();
156     }
157
158     @Override
159     public void applyRecoverySnapshot(byte[] bytes) {
160         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
161         applySnapshotBytes(bytes);
162     }
163
164     private void applySnapshotBytes(byte[] bytes) {
165         try {
166             Object data = toObject(bytes);
167             if (data instanceof List) {
168                 state.addAll((List<?>) data);
169             }
170         } catch (Exception e) {
171             e.printStackTrace();
172         }
173     }
174
175     @Override
176     public void createSnapshot(ActorRef actorRef) {
177         LOG.info("{}: createSnapshot called", persistenceId());
178         snapshotCohortDelegate.createSnapshot(actorRef);
179     }
180
181     @Override
182     public void applySnapshot(byte [] snapshot) {
183         LOG.info("{}: applySnapshot called", persistenceId());
184         applySnapshotBytes(snapshot);
185         snapshotCohortDelegate.applySnapshot(snapshot);
186     }
187
188     @Override
189     protected void onStateChanged() {
190         actorDelegate.onStateChanged();
191     }
192
193     @Override
194     protected Optional<ActorRef> getRoleChangeNotifier() {
195         return Optional.fromNullable(roleChangeNotifier);
196     }
197
198     @Override public String persistenceId() {
199         return this.getId();
200     }
201
202     protected void newBehavior(RaftActorBehavior newBehavior) {
203         self().tell(newBehavior, ActorRef.noSender());
204     }
205
206     @Override
207     public void handleCommand(final Object message) {
208         if(message instanceof RaftActorBehavior) {
209             super.changeCurrentBehavior((RaftActorBehavior)message);
210         } else {
211             super.handleCommand(message);
212
213             if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
214                 snapshotCommitted.countDown();
215             }
216         }
217     }
218
219     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
220         Object obj = null;
221         ByteArrayInputStream bis = null;
222         ObjectInputStream ois = null;
223         try {
224             bis = new ByteArrayInputStream(bs);
225             ois = new ObjectInputStream(bis);
226             obj = ois.readObject();
227         } finally {
228             if (bis != null) {
229                 bis.close();
230             }
231             if (ois != null) {
232                 ois.close();
233             }
234         }
235         return obj;
236     }
237
238     public ReplicatedLog getReplicatedLog(){
239         return this.getRaftActorContext().getReplicatedLog();
240     }
241
242     @Override
243     public byte[] getRestoreFromSnapshot() {
244         return restoreFromSnapshot;
245     }
246
247     public static Props props(final String id, final Map<String, String> peerAddresses,
248             ConfigParams config){
249         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
250     }
251
252     public static Props props(final String id, final Map<String, String> peerAddresses,
253                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
254         return builder().id(id).peerAddresses(peerAddresses).config(config).
255                 dataPersistenceProvider(dataPersistenceProvider).props();
256     }
257
258     public static Builder builder() {
259         return new Builder();
260     }
261
262     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
263         private Map<String, String> peerAddresses = Collections.emptyMap();
264         private String id;
265         private ConfigParams config;
266         private DataPersistenceProvider dataPersistenceProvider;
267         private ActorRef roleChangeNotifier;
268         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
269         private byte[] restoreFromSnapshot;
270         private Optional<Boolean> persistent = Optional.absent();
271         private final Class<A> actorClass;
272
273         protected AbstractBuilder(Class<A> actorClass) {
274             this.actorClass = actorClass;
275         }
276
277         @SuppressWarnings("unchecked")
278         private T self() {
279             return (T) this;
280         }
281
282         public T id(String id) {
283             this.id = id;
284             return self();
285         }
286
287         public T peerAddresses(Map<String, String> peerAddresses) {
288             this.peerAddresses = peerAddresses;
289             return self();
290         }
291
292         public T config(ConfigParams config) {
293             this.config = config;
294             return self();
295         }
296
297         public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
298             this.dataPersistenceProvider = dataPersistenceProvider;
299             return self();
300         }
301
302         public T roleChangeNotifier(ActorRef roleChangeNotifier) {
303             this.roleChangeNotifier = roleChangeNotifier;
304             return self();
305         }
306
307         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
308             this.snapshotMessageSupport = snapshotMessageSupport;
309             return self();
310         }
311
312         public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
313             this.restoreFromSnapshot = restoreFromSnapshot;
314             return self();
315         }
316
317         public T persistent(Optional<Boolean> persistent) {
318             this.persistent = persistent;
319             return self();
320         }
321
322         public Props props() {
323             return Props.create(actorClass, this);
324         }
325     }
326
327     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
328         private Builder() {
329             super(MockRaftActor.class);
330         }
331     }
332 }