BUG-5626: use Identifier instead of String
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.base.Function;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.cluster.DataPersistenceProvider;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31 import org.opendaylight.yangtools.concepts.Identifier;
32
33 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
34     public static final short PAYLOAD_VERSION = 5;
35
36     final RaftActor actorDelegate;
37     final RaftActorRecoveryCohort recoveryCohortDelegate;
38     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
39     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
40     private final List<Object> state;
41     private final ActorRef roleChangeNotifier;
42     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
43     private RaftActorRecoverySupport raftActorRecoverySupport;
44     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
45     private final byte[] restoreFromSnapshot;
46     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
47     private final Function<Runnable, Void> pauseLeaderFunction;
48
49     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
50         super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
51         state = new ArrayList<>();
52         this.actorDelegate = mock(RaftActor.class);
53         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
54         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
55
56         if(builder.dataPersistenceProvider == null){
57             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
58         } else {
59             setPersistence(builder.dataPersistenceProvider);
60         }
61
62         roleChangeNotifier = builder.roleChangeNotifier;
63         snapshotMessageSupport = builder.snapshotMessageSupport;
64         restoreFromSnapshot = builder.restoreFromSnapshot;
65         pauseLeaderFunction = builder.pauseLeaderFunction;
66     }
67
68     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
69         raftActorRecoverySupport = support;
70     }
71
72     @Override
73     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
74         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
75     }
76
77     @Override
78     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
79         return snapshotMessageSupport != null ? snapshotMessageSupport :
80             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
81     }
82
83     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
84         return snapshotMessageSupport;
85     }
86
87     public void waitForRecoveryComplete() {
88         try {
89             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
90         } catch (InterruptedException e) {
91             e.printStackTrace();
92         }
93     }
94
95     public void waitForInitializeBehaviorComplete() {
96         try {
97             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
98         } catch (InterruptedException e) {
99             e.printStackTrace();
100         }
101     }
102
103
104     public void waitUntilLeader(){
105         for(int i = 0;i < 10; i++){
106             if(isLeader()){
107                 break;
108             }
109             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
110         }
111     }
112
113     public List<Object> getState() {
114         return state;
115     }
116
117
118     @Override
119     protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
120         actorDelegate.applyState(clientActor, identifier, data);
121         LOG.info("{}: applyState called: {}", persistenceId(), data);
122
123         state.add(data);
124     }
125
126     @Override
127     @Nonnull
128     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
129         return this;
130     }
131
132     @Override
133     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
134         return this;
135     }
136
137     @Override
138     public void startLogRecoveryBatch(int maxBatchSize) {
139     }
140
141     @Override
142     public void appendRecoveredLogEntry(Payload data) {
143         state.add(data);
144     }
145
146     @Override
147     public void applyCurrentLogRecoveryBatch() {
148     }
149
150     @Override
151     protected void onRecoveryComplete() {
152         actorDelegate.onRecoveryComplete();
153         recoveryComplete.countDown();
154     }
155
156     @Override
157     protected void initializeBehavior() {
158         super.initializeBehavior();
159         initializeBehaviorComplete.countDown();
160     }
161
162     @Override
163     public void applyRecoverySnapshot(byte[] bytes) {
164         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
165         applySnapshotBytes(bytes);
166     }
167
168     private void applySnapshotBytes(byte[] bytes) {
169         try {
170             Object data = toObject(bytes);
171             if (data instanceof List) {
172                 state.addAll((List<?>) data);
173             }
174         } catch (Exception e) {
175             e.printStackTrace();
176         }
177     }
178
179     @Override
180     public void createSnapshot(ActorRef actorRef) {
181         LOG.info("{}: createSnapshot called", persistenceId());
182         snapshotCohortDelegate.createSnapshot(actorRef);
183     }
184
185     @Override
186     public void applySnapshot(byte [] snapshot) {
187         LOG.info("{}: applySnapshot called", persistenceId());
188         applySnapshotBytes(snapshot);
189         snapshotCohortDelegate.applySnapshot(snapshot);
190     }
191
192     @Override
193     protected void onStateChanged() {
194         actorDelegate.onStateChanged();
195     }
196
197     @Override
198     protected Optional<ActorRef> getRoleChangeNotifier() {
199         return Optional.fromNullable(roleChangeNotifier);
200     }
201
202     @Override public String persistenceId() {
203         return this.getId();
204     }
205
206     protected void newBehavior(RaftActorBehavior newBehavior) {
207         self().tell(newBehavior, ActorRef.noSender());
208     }
209
210     @Override
211     protected void handleCommand(final Object message) {
212         if(message instanceof RaftActorBehavior) {
213             super.changeCurrentBehavior((RaftActorBehavior)message);
214         } else {
215             super.handleCommand(message);
216
217             if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
218                 snapshotCommitted.countDown();
219             }
220         }
221     }
222
223     @Override
224     protected void pauseLeader(Runnable operation) {
225         if(pauseLeaderFunction != null) {
226             pauseLeaderFunction.apply(operation);
227         } else {
228             super.pauseLeader(operation);
229         }
230     }
231
232     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
233         Object obj = null;
234         ByteArrayInputStream bis = null;
235         ObjectInputStream ois = null;
236         try {
237             bis = new ByteArrayInputStream(bs);
238             ois = new ObjectInputStream(bis);
239             obj = ois.readObject();
240         } finally {
241             if (bis != null) {
242                 bis.close();
243             }
244             if (ois != null) {
245                 ois.close();
246             }
247         }
248         return obj;
249     }
250
251     public ReplicatedLog getReplicatedLog(){
252         return this.getRaftActorContext().getReplicatedLog();
253     }
254
255     @Override
256     public byte[] getRestoreFromSnapshot() {
257         return restoreFromSnapshot;
258     }
259
260     public static Props props(final String id, final Map<String, String> peerAddresses,
261             ConfigParams config){
262         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
263     }
264
265     public static Props props(final String id, final Map<String, String> peerAddresses,
266                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
267         return builder().id(id).peerAddresses(peerAddresses).config(config).
268                 dataPersistenceProvider(dataPersistenceProvider).props();
269     }
270
271     public static Builder builder() {
272         return new Builder();
273     }
274
275     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
276         private Map<String, String> peerAddresses = Collections.emptyMap();
277         private String id;
278         private ConfigParams config;
279         private DataPersistenceProvider dataPersistenceProvider;
280         private ActorRef roleChangeNotifier;
281         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
282         private byte[] restoreFromSnapshot;
283         private Optional<Boolean> persistent = Optional.absent();
284         private final Class<A> actorClass;
285         private Function<Runnable, Void> pauseLeaderFunction;
286
287         protected AbstractBuilder(Class<A> actorClass) {
288             this.actorClass = actorClass;
289         }
290
291         @SuppressWarnings("unchecked")
292         private T self() {
293             return (T) this;
294         }
295
296         public T id(String id) {
297             this.id = id;
298             return self();
299         }
300
301         public T peerAddresses(Map<String, String> peerAddresses) {
302             this.peerAddresses = peerAddresses;
303             return self();
304         }
305
306         public T config(ConfigParams config) {
307             this.config = config;
308             return self();
309         }
310
311         public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
312             this.dataPersistenceProvider = dataPersistenceProvider;
313             return self();
314         }
315
316         public T roleChangeNotifier(ActorRef roleChangeNotifier) {
317             this.roleChangeNotifier = roleChangeNotifier;
318             return self();
319         }
320
321         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
322             this.snapshotMessageSupport = snapshotMessageSupport;
323             return self();
324         }
325
326         public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
327             this.restoreFromSnapshot = restoreFromSnapshot;
328             return self();
329         }
330
331         public T persistent(Optional<Boolean> persistent) {
332             this.persistent = persistent;
333             return self();
334         }
335
336         public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
337             this.pauseLeaderFunction = pauseLeaderFunction;
338             return self();
339         }
340
341         public Props props() {
342             return Props.create(actorClass, this);
343         }
344     }
345
346     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
347         private Builder() {
348             super(MockRaftActor.class);
349         }
350     }
351 }