Bug 7362: Notify applyState synchronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.base.Function;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Throwables;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.io.ByteArrayInputStream;
21 import java.io.IOException;
22 import java.io.ObjectInputStream;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
33 import org.opendaylight.yangtools.concepts.Identifier;
34
35 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
36     public static final short PAYLOAD_VERSION = 5;
37
38     final RaftActor actorDelegate;
39     final RaftActorRecoveryCohort recoveryCohortDelegate;
40     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
41     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
42     private final List<Object> state;
43     private final ActorRef roleChangeNotifier;
44     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
45     private RaftActorRecoverySupport raftActorRecoverySupport;
46     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
47     private final byte[] restoreFromSnapshot;
48     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
49     private final Function<Runnable, Void> pauseLeaderFunction;
50
51     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
52         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
53             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
54         state = new ArrayList<>();
55         this.actorDelegate = mock(RaftActor.class);
56         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
57
58         this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
59             mock(RaftActorSnapshotCohort.class);
60
61         if (builder.dataPersistenceProvider == null) {
62             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
63         } else {
64             setPersistence(builder.dataPersistenceProvider);
65         }
66
67         roleChangeNotifier = builder.roleChangeNotifier;
68         snapshotMessageSupport = builder.snapshotMessageSupport;
69         restoreFromSnapshot = builder.restoreFromSnapshot;
70         pauseLeaderFunction = builder.pauseLeaderFunction;
71     }
72
73     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
74         raftActorRecoverySupport = support;
75     }
76
77     @Override
78     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
79         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
80     }
81
82     @Override
83     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
84         return snapshotMessageSupport != null ? snapshotMessageSupport :
85             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
86     }
87
88     @Override
89     public RaftActorContext getRaftActorContext() {
90         return super.getRaftActorContext();
91     }
92
93     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
94         return snapshotMessageSupport;
95     }
96
97     public void waitForRecoveryComplete() {
98         try {
99             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
100         } catch (InterruptedException e) {
101             Throwables.propagate(e);
102         }
103     }
104
105     public void waitForInitializeBehaviorComplete() {
106         try {
107             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
108         } catch (InterruptedException e) {
109             Throwables.propagate(e);
110         }
111     }
112
113
114     public void waitUntilLeader() {
115         for (int i = 0; i < 10; i++) {
116             if (isLeader()) {
117                 break;
118             }
119             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
120         }
121     }
122
123     public List<Object> getState() {
124         return state;
125     }
126
127     @Override
128     protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
129         actorDelegate.applyState(clientActor, identifier, data);
130         LOG.info("{}: applyState called: {}", persistenceId(), data);
131
132         state.add(data);
133     }
134
135     @Override
136     @Nonnull
137     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
138         return this;
139     }
140
141     @Override
142     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
143         return this;
144     }
145
146     @Override
147     public void startLogRecoveryBatch(int maxBatchSize) {
148     }
149
150     @Override
151     public void appendRecoveredLogEntry(Payload data) {
152         state.add(data);
153     }
154
155     @Override
156     public void applyCurrentLogRecoveryBatch() {
157     }
158
159     @Override
160     protected void onRecoveryComplete() {
161         actorDelegate.onRecoveryComplete();
162         recoveryComplete.countDown();
163     }
164
165     @Override
166     protected void initializeBehavior() {
167         super.initializeBehavior();
168         initializeBehaviorComplete.countDown();
169     }
170
171     @Override
172     public void applyRecoverySnapshot(byte[] bytes) {
173         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
174         applySnapshotBytes(bytes);
175     }
176
177     private void applySnapshotBytes(byte[] bytes) {
178         if (bytes.length == 0) {
179             return;
180         }
181
182         try {
183             Object data = toObject(bytes);
184             if (data instanceof List) {
185                 state.clear();
186                 state.addAll((List<?>) data);
187             }
188         } catch (ClassNotFoundException | IOException e) {
189             Throwables.propagate(e);
190         }
191     }
192
193     @Override
194     public void createSnapshot(ActorRef actorRef) {
195         LOG.info("{}: createSnapshot called", persistenceId());
196         snapshotCohortDelegate.createSnapshot(actorRef);
197     }
198
199     @Override
200     public void applySnapshot(byte [] snapshot) {
201         LOG.info("{}: applySnapshot called", persistenceId());
202         applySnapshotBytes(snapshot);
203         snapshotCohortDelegate.applySnapshot(snapshot);
204     }
205
206     @Override
207     protected void onStateChanged() {
208         actorDelegate.onStateChanged();
209     }
210
211     @Override
212     protected Optional<ActorRef> getRoleChangeNotifier() {
213         return Optional.fromNullable(roleChangeNotifier);
214     }
215
216     @Override public String persistenceId() {
217         return this.getId();
218     }
219
220     protected void newBehavior(RaftActorBehavior newBehavior) {
221         self().tell(newBehavior, ActorRef.noSender());
222     }
223
224     @Override
225     protected void handleCommand(final Object message) {
226         if (message instanceof RaftActorBehavior) {
227             super.changeCurrentBehavior((RaftActorBehavior)message);
228         } else {
229             super.handleCommand(message);
230
231             if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
232                 snapshotCommitted.countDown();
233             }
234         }
235     }
236
237     @Override
238     protected void pauseLeader(Runnable operation) {
239         if (pauseLeaderFunction != null) {
240             pauseLeaderFunction.apply(operation);
241         } else {
242             super.pauseLeader(operation);
243         }
244     }
245
246     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
247         Object obj = null;
248         ByteArrayInputStream bis = null;
249         ObjectInputStream ois = null;
250         try {
251             bis = new ByteArrayInputStream(bs);
252             ois = new ObjectInputStream(bis);
253             obj = ois.readObject();
254         } finally {
255             if (bis != null) {
256                 bis.close();
257             }
258             if (ois != null) {
259                 ois.close();
260             }
261         }
262         return obj;
263     }
264
265     public ReplicatedLog getReplicatedLog() {
266         return this.getRaftActorContext().getReplicatedLog();
267     }
268
269     @Override
270     public byte[] getRestoreFromSnapshot() {
271         return restoreFromSnapshot;
272     }
273
274     public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
275         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
276     }
277
278     public static Props props(final String id, final Map<String, String> peerAddresses,
279                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
280         return builder().id(id).peerAddresses(peerAddresses).config(config)
281                 .dataPersistenceProvider(dataPersistenceProvider).props();
282     }
283
284     public static Builder builder() {
285         return new Builder();
286     }
287
288     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
289         private Map<String, String> peerAddresses = Collections.emptyMap();
290         private String id;
291         private ConfigParams config;
292         private DataPersistenceProvider dataPersistenceProvider;
293         private ActorRef roleChangeNotifier;
294         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
295         private byte[] restoreFromSnapshot;
296         private Optional<Boolean> persistent = Optional.absent();
297         private final Class<A> actorClass;
298         private Function<Runnable, Void> pauseLeaderFunction;
299         private RaftActorSnapshotCohort snapshotCohort;
300
301         protected AbstractBuilder(Class<A> actorClass) {
302             this.actorClass = actorClass;
303         }
304
305         @SuppressWarnings("unchecked")
306         private T self() {
307             return (T) this;
308         }
309
310         public T id(String newId) {
311             this.id = newId;
312             return self();
313         }
314
315         public T peerAddresses(Map<String, String> newPeerAddresses) {
316             this.peerAddresses = newPeerAddresses;
317             return self();
318         }
319
320         public T config(ConfigParams newConfig) {
321             this.config = newConfig;
322             return self();
323         }
324
325         public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
326             this.dataPersistenceProvider = newDataPersistenceProvider;
327             return self();
328         }
329
330         public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
331             this.roleChangeNotifier = newRoleChangeNotifier;
332             return self();
333         }
334
335         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
336             this.snapshotMessageSupport = newSnapshotMessageSupport;
337             return self();
338         }
339
340         public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
341             this.restoreFromSnapshot = newRestoreFromSnapshot;
342             return self();
343         }
344
345         public T persistent(Optional<Boolean> newPersistent) {
346             this.persistent = newPersistent;
347             return self();
348         }
349
350         public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
351             this.pauseLeaderFunction = newPauseLeaderFunction;
352             return self();
353         }
354
355         public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
356             this.snapshotCohort = newSnapshotCohort;
357             return self();
358         }
359
360         public Props props() {
361             return Props.create(actorClass, this);
362         }
363     }
364
365     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
366         private Builder() {
367             super(MockRaftActor.class);
368         }
369     }
370 }