Bug 8606: Continue leadership transfer on pauseLeader timeout
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.base.Function;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Throwables;
19 import com.google.common.io.ByteSource;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.apache.commons.lang3.SerializationUtils;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
34 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
35 import org.opendaylight.yangtools.concepts.Identifier;
36
37 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
38     public static final short PAYLOAD_VERSION = 5;
39
40     final RaftActor actorDelegate;
41     final RaftActorRecoveryCohort recoveryCohortDelegate;
42     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
43     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
44     private final List<Object> state;
45     private final ActorRef roleChangeNotifier;
46     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
47     private RaftActorRecoverySupport raftActorRecoverySupport;
48     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
49     private final Snapshot restoreFromSnapshot;
50     final CountDownLatch snapshotCommitted = new CountDownLatch(1);
51     private final Function<Runnable, Void> pauseLeaderFunction;
52
53     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
54         super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
55             Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
56         state = new ArrayList<>();
57         this.actorDelegate = mock(RaftActor.class);
58         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
59
60         this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
61             mock(RaftActorSnapshotCohort.class);
62
63         if (builder.dataPersistenceProvider == null) {
64             setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
65         } else {
66             setPersistence(builder.dataPersistenceProvider);
67         }
68
69         roleChangeNotifier = builder.roleChangeNotifier;
70         snapshotMessageSupport = builder.snapshotMessageSupport;
71         restoreFromSnapshot = builder.restoreFromSnapshot;
72         pauseLeaderFunction = builder.pauseLeaderFunction;
73     }
74
75     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
76         raftActorRecoverySupport = support;
77     }
78
79     @Override
80     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
81         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
82     }
83
84     @Override
85     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
86         return snapshotMessageSupport != null ? snapshotMessageSupport :
87             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
88     }
89
90     @Override
91     public RaftActorContext getRaftActorContext() {
92         return super.getRaftActorContext();
93     }
94
95     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
96         return snapshotMessageSupport;
97     }
98
99     public void waitForRecoveryComplete() {
100         try {
101             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
102         } catch (InterruptedException e) {
103             Throwables.propagate(e);
104         }
105     }
106
107     public void waitForInitializeBehaviorComplete() {
108         try {
109             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
110         } catch (InterruptedException e) {
111             Throwables.propagate(e);
112         }
113     }
114
115
116     public void waitUntilLeader() {
117         for (int i = 0; i < 10; i++) {
118             if (isLeader()) {
119                 break;
120             }
121             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
122         }
123     }
124
125     public List<Object> getState() {
126         return state;
127     }
128
129     @Override
130     protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
131         actorDelegate.applyState(clientActor, identifier, data);
132         LOG.info("{}: applyState called: {}", persistenceId(), data);
133
134         state.add(data);
135     }
136
137     @Override
138     @Nonnull
139     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
140         return this;
141     }
142
143     @Override
144     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
145         return this;
146     }
147
148     @Override
149     public void startLogRecoveryBatch(int maxBatchSize) {
150     }
151
152     @Override
153     public void appendRecoveredLogEntry(Payload data) {
154         state.add(data);
155     }
156
157     @Override
158     public void applyCurrentLogRecoveryBatch() {
159     }
160
161     @Override
162     protected void onRecoveryComplete() {
163         actorDelegate.onRecoveryComplete();
164         recoveryComplete.countDown();
165     }
166
167     @Override
168     protected void initializeBehavior() {
169         super.initializeBehavior();
170         initializeBehaviorComplete.countDown();
171     }
172
173     @Override
174     public void applyRecoverySnapshot(Snapshot.State newState) {
175         recoveryCohortDelegate.applyRecoverySnapshot(newState);
176         applySnapshotState(newState);
177     }
178
179     private void applySnapshotState(Snapshot.State newState) {
180         if (newState instanceof MockSnapshotState) {
181             state.clear();
182             state.addAll(((MockSnapshotState)newState).getState());
183         }
184     }
185
186     @Override
187     public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
188         LOG.info("{}: createSnapshot called", persistenceId());
189         snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
190     }
191
192     @Override
193     public void applySnapshot(Snapshot.State newState) {
194         LOG.info("{}: applySnapshot called", persistenceId());
195         applySnapshotState(newState);
196         snapshotCohortDelegate.applySnapshot(newState);
197     }
198
199     @Override
200     public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
201         try {
202             return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
203         } catch (IOException e) {
204             throw new RuntimeException("Error deserializing state", e);
205         }
206     }
207
208     @Override
209     protected void onStateChanged() {
210         actorDelegate.onStateChanged();
211     }
212
213     @Override
214     protected Optional<ActorRef> getRoleChangeNotifier() {
215         return Optional.fromNullable(roleChangeNotifier);
216     }
217
218     @Override public String persistenceId() {
219         return this.getId();
220     }
221
222     protected void newBehavior(RaftActorBehavior newBehavior) {
223         self().tell(newBehavior, ActorRef.noSender());
224     }
225
226     @Override
227     protected void handleCommand(final Object message) {
228         if (message instanceof RaftActorBehavior) {
229             super.changeCurrentBehavior((RaftActorBehavior)message);
230         } else {
231             super.handleCommand(message);
232
233             if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
234                 snapshotCommitted.countDown();
235             }
236         }
237     }
238
239     @Override
240     protected void pauseLeader(Runnable operation) {
241         if (pauseLeaderFunction != null) {
242             pauseLeaderFunction.apply(operation);
243         } else {
244             super.pauseLeader(operation);
245         }
246     }
247
248     public static List<Object> fromState(Snapshot.State from) {
249         if (from instanceof MockSnapshotState) {
250             return ((MockSnapshotState)from).getState();
251         }
252
253         throw new IllegalStateException("Unexpected snapshot State: " + from);
254     }
255
256     public ReplicatedLog getReplicatedLog() {
257         return this.getRaftActorContext().getReplicatedLog();
258     }
259
260     @Override
261     public Snapshot getRestoreFromSnapshot() {
262         return restoreFromSnapshot;
263     }
264
265     public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
266         return builder().id(id).peerAddresses(peerAddresses).config(config).props();
267     }
268
269     public static Props props(final String id, final Map<String, String> peerAddresses,
270                               ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
271         return builder().id(id).peerAddresses(peerAddresses).config(config)
272                 .dataPersistenceProvider(dataPersistenceProvider).props();
273     }
274
275     public static Builder builder() {
276         return new Builder();
277     }
278
279     public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
280         private Map<String, String> peerAddresses = Collections.emptyMap();
281         private String id;
282         private ConfigParams config;
283         private DataPersistenceProvider dataPersistenceProvider;
284         private ActorRef roleChangeNotifier;
285         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
286         private Snapshot restoreFromSnapshot;
287         private Optional<Boolean> persistent = Optional.absent();
288         private final Class<A> actorClass;
289         private Function<Runnable, Void> pauseLeaderFunction;
290         private RaftActorSnapshotCohort snapshotCohort;
291
292         protected AbstractBuilder(Class<A> actorClass) {
293             this.actorClass = actorClass;
294         }
295
296         @SuppressWarnings("unchecked")
297         private T self() {
298             return (T) this;
299         }
300
301         public T id(String newId) {
302             this.id = newId;
303             return self();
304         }
305
306         public T peerAddresses(Map<String, String> newPeerAddresses) {
307             this.peerAddresses = newPeerAddresses;
308             return self();
309         }
310
311         public T config(ConfigParams newConfig) {
312             this.config = newConfig;
313             return self();
314         }
315
316         public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
317             this.dataPersistenceProvider = newDataPersistenceProvider;
318             return self();
319         }
320
321         public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
322             this.roleChangeNotifier = newRoleChangeNotifier;
323             return self();
324         }
325
326         public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
327             this.snapshotMessageSupport = newSnapshotMessageSupport;
328             return self();
329         }
330
331         public T restoreFromSnapshot(Snapshot newRestoreFromSnapshot) {
332             this.restoreFromSnapshot = newRestoreFromSnapshot;
333             return self();
334         }
335
336         public T persistent(Optional<Boolean> newPersistent) {
337             this.persistent = newPersistent;
338             return self();
339         }
340
341         public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
342             this.pauseLeaderFunction = newPauseLeaderFunction;
343             return self();
344         }
345
346         public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
347             this.snapshotCohort = newSnapshotCohort;
348             return self();
349         }
350
351         public Props props() {
352             return Props.create(actorClass, this);
353         }
354     }
355
356     public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
357         private Builder() {
358             super(MockRaftActor.class);
359         }
360     }
361
362     public static class MockSnapshotState implements Snapshot.State {
363         private static final long serialVersionUID = 1L;
364
365         private final List<Object> state;
366
367         public MockSnapshotState(List<Object> state) {
368             this.state = state;
369         }
370
371         public List<Object> getState() {
372             return state;
373         }
374
375         @Override
376         public int hashCode() {
377             final int prime = 31;
378             int result = 1;
379             result = prime * result + (state == null ? 0 : state.hashCode());
380             return result;
381         }
382
383         @Override
384         public boolean equals(Object obj) {
385             if (this == obj) {
386                 return true;
387             }
388             if (obj == null) {
389                 return false;
390             }
391             if (getClass() != obj.getClass()) {
392                 return false;
393             }
394             MockSnapshotState other = (MockSnapshotState) obj;
395             if (state == null) {
396                 if (other.state != null) {
397                     return false;
398                 }
399             } else if (!state.equals(other.state)) {
400                 return false;
401             }
402             return true;
403         }
404
405         @Override
406         public String toString() {
407             return "MockSnapshotState [state=" + state + "]";
408         }
409     }
410 }