b4b558b6f357a908c8ac537dd80e86c3abdd5eca
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.base.Function;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Throwables;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.io.ByteArrayInputStream;
21 import java.io.IOException;
22 import java.io.ObjectInputStream;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
33 import org.opendaylight.yangtools.concepts.Identifier;
34
35 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
36     public static final short PAYLOAD_VERSION = 5;
37
38     final RaftActor actorDelegate;
39     final RaftActorRecoveryCohort recoveryCohortDelegate;
40     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
41     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
42     private final List<Object> state;
43     private final ActorRef roleChangeNotifier;
44     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
45     private RaftActorRecoverySupport raftActorRecoverySupport;
46     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
47     private final byte[] restoreFromSnapshot;
48     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
49     private final Function<Runnable, Void> pauseLeaderFunction;
50
51     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
52         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
53             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
54         state = new ArrayList<>();
55         this.actorDelegate = mock(RaftActor.class);
56         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
57
58         this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
59             mock(RaftActorSnapshotCohort.class);
60
61         if (builder.dataPersistenceProvider == null) {
62             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
63         } else {
64             setPersistence(builder.dataPersistenceProvider);
65         }
66
67         roleChangeNotifier = builder.roleChangeNotifier;
68         snapshotMessageSupport = builder.snapshotMessageSupport;
69         restoreFromSnapshot = builder.restoreFromSnapshot;
70         pauseLeaderFunction = builder.pauseLeaderFunction;
71     }
72
73     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
74         raftActorRecoverySupport = support;
75     }
76
77     @Override
78     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
79         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
80     }
81
82     @Override
83     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
84         return snapshotMessageSupport != null ? snapshotMessageSupport :
85             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
86     }
87
88     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
89         return snapshotMessageSupport;
90     }
91
92     public void waitForRecoveryComplete() {
93         try {
94             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
95         } catch (InterruptedException e) {
96             Throwables.propagate(e);
97         }
98     }
99
100     public void waitForInitializeBehaviorComplete() {
101         try {
102             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
103         } catch (InterruptedException e) {
104             Throwables.propagate(e);
105         }
106     }
107
108
109     public void waitUntilLeader() {
110         for (int i = 0; i < 10; i++) {
111             if (isLeader()) {
112                 break;
113             }
114             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
115         }
116     }
117
118     public List<Object> getState() {
119         return state;
120     }
121
122     @Override
123     protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
124         actorDelegate.applyState(clientActor, identifier, data);
125         LOG.info("{}: applyState called: {}", persistenceId(), data);
126
127         state.add(data);
128     }
129
130     @Override
131     @Nonnull
132     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
133         return this;
134     }
135
136     @Override
137     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
138         return this;
139     }
140
141     @Override
142     public void startLogRecoveryBatch(int maxBatchSize) {
143     }
144
145     @Override
146     public void appendRecoveredLogEntry(Payload data) {
147         state.add(data);
148     }
149
150     @Override
151     public void applyCurrentLogRecoveryBatch() {
152     }
153
154     @Override
155     protected void onRecoveryComplete() {
156         actorDelegate.onRecoveryComplete();
157         recoveryComplete.countDown();
158     }
159
160     @Override
161     protected void initializeBehavior() {
162         super.initializeBehavior();
163         initializeBehaviorComplete.countDown();
164     }
165
166     @Override
167     public void applyRecoverySnapshot(byte[] bytes) {
168         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
169         applySnapshotBytes(bytes);
170     }
171
172     private void applySnapshotBytes(byte[] bytes) {
173         if (bytes.length == 0) {
174             return;
175         }
176
177         try {
178             Object data = toObject(bytes);
179             if (data instanceof List) {
180                 state.clear();
181                 state.addAll((List<?>) data);
182             }
183         } catch (ClassNotFoundException | IOException e) {
184             Throwables.propagate(e);
185         }
186     }
187
188     @Override
189     public void createSnapshot(ActorRef actorRef) {
190         LOG.info("{}: createSnapshot called", persistenceId());
191         snapshotCohortDelegate.createSnapshot(actorRef);
192     }
193
194     @Override
195     public void applySnapshot(byte [] snapshot) {
196         LOG.info("{}: applySnapshot called", persistenceId());
197         applySnapshotBytes(snapshot);
198         snapshotCohortDelegate.applySnapshot(snapshot);
199     }
200
201     @Override
202     protected void onStateChanged() {
203         actorDelegate.onStateChanged();
204     }
205
206     @Override
207     protected Optional<ActorRef> getRoleChangeNotifier() {
208         return Optional.fromNullable(roleChangeNotifier);
209     }
210
211     @Override public String persistenceId() {
212         return this.getId();
213     }
214
215     protected void newBehavior(RaftActorBehavior newBehavior) {
216         self().tell(newBehavior, ActorRef.noSender());
217     }
218
219     @Override
220     protected void handleCommand(final Object message) {
221         if (message instanceof RaftActorBehavior) {
222             super.changeCurrentBehavior((RaftActorBehavior)message);
223         } else {
224             super.handleCommand(message);
225
226             if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
227                 snapshotCommitted.countDown();
228             }
229         }
230     }
231
232     @Override
233     protected void pauseLeader(Runnable operation) {
234         if (pauseLeaderFunction != null) {
235             pauseLeaderFunction.apply(operation);
236         } else {
237             super.pauseLeader(operation);
238         }
239     }
240
241     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
242         Object obj = null;
243         ByteArrayInputStream bis = null;
244         ObjectInputStream ois = null;
245         try {
246             bis = new ByteArrayInputStream(bs);
247             ois = new ObjectInputStream(bis);
248             obj = ois.readObject();
249         } finally {
250             if (bis != null) {
251                 bis.close();
252             }
253             if (ois != null) {
254                 ois.close();
255             }
256         }
257         return obj;
258     }
259
260     public ReplicatedLog getReplicatedLog() {
261         return this.getRaftActorContext().getReplicatedLog();
262     }
263
264     @Override
265     public byte[] getRestoreFromSnapshot() {
266         return restoreFromSnapshot;
267     }
268
269     public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
270         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
271     }
272
273     public static Props props(final String id, final Map<String, String> peerAddresses,
274                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
275         return builder().id(id).peerAddresses(peerAddresses).config(config)
276                 .dataPersistenceProvider(dataPersistenceProvider).props();
277     }
278
279     public static Builder builder() {
280         return new Builder();
281     }
282
283     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
284         private Map<String, String> peerAddresses = Collections.emptyMap();
285         private String id;
286         private ConfigParams config;
287         private DataPersistenceProvider dataPersistenceProvider;
288         private ActorRef roleChangeNotifier;
289         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
290         private byte[] restoreFromSnapshot;
291         private Optional<Boolean> persistent = Optional.absent();
292         private final Class<A> actorClass;
293         private Function<Runnable, Void> pauseLeaderFunction;
294         private RaftActorSnapshotCohort snapshotCohort;
295
296         protected AbstractBuilder(Class<A> actorClass) {
297             this.actorClass = actorClass;
298         }
299
300         @SuppressWarnings("unchecked")
301         private T self() {
302             return (T) this;
303         }
304
305         public T id(String newId) {
306             this.id = newId;
307             return self();
308         }
309
310         public T peerAddresses(Map<String, String> newPeerAddresses) {
311             this.peerAddresses = newPeerAddresses;
312             return self();
313         }
314
315         public T config(ConfigParams newConfig) {
316             this.config = newConfig;
317             return self();
318         }
319
320         public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
321             this.dataPersistenceProvider = newDataPersistenceProvider;
322             return self();
323         }
324
325         public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
326             this.roleChangeNotifier = newRoleChangeNotifier;
327             return self();
328         }
329
330         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
331             this.snapshotMessageSupport = newSnapshotMessageSupport;
332             return self();
333         }
334
335         public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
336             this.restoreFromSnapshot = newRestoreFromSnapshot;
337             return self();
338         }
339
340         public T persistent(Optional<Boolean> newPersistent) {
341             this.persistent = newPersistent;
342             return self();
343         }
344
345         public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
346             this.pauseLeaderFunction = newPauseLeaderFunction;
347             return self();
348         }
349
350         public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
351             this.snapshotCohort = newSnapshotCohort;
352             return self();
353         }
354
355         public Props props() {
356             return Props.create(actorClass, this);
357         }
358     }
359
360     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
361         private Builder() {
362             super(MockRaftActor.class);
363         }
364     }
365 }