2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.io.ByteSource;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.IOException;
19 import java.io.OutputStream;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Function;
29 import org.apache.commons.lang3.SerializationUtils;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
32 import org.opendaylight.controller.cluster.raft.messages.Payload;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
34 import org.opendaylight.yangtools.concepts.Identifier;
36 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
37 public static final short PAYLOAD_VERSION = 5;
39 final RaftActor actorDelegate;
40 final RaftActorRecoveryCohort recoveryCohortDelegate;
41 volatile RaftActorSnapshotCohort snapshotCohortDelegate;
42 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
43 private final List<Object> state;
44 private final ActorRef roleChangeNotifier;
45 protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
46 private RaftActorRecoverySupport raftActorRecoverySupport;
47 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
48 private final Snapshot restoreFromSnapshot;
49 final CountDownLatch snapshotCommitted = new CountDownLatch(1);
50 private final Function<Runnable, Void> pauseLeaderFunction;
52 protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
53 super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
54 Collections.emptyMap(), Optional.ofNullable(builder.config), PAYLOAD_VERSION);
55 state = Collections.synchronizedList(new ArrayList<>());
56 actorDelegate = mock(RaftActor.class);
57 recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
59 snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
60 mock(RaftActorSnapshotCohort.class);
62 if (builder.dataPersistenceProvider == null) {
63 setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
65 setPersistence(builder.dataPersistenceProvider);
68 roleChangeNotifier = builder.roleChangeNotifier;
69 snapshotMessageSupport = builder.snapshotMessageSupport;
70 restoreFromSnapshot = builder.restoreFromSnapshot;
71 pauseLeaderFunction = builder.pauseLeaderFunction;
74 public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) {
75 raftActorRecoverySupport = support;
79 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
80 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
84 protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
85 return snapshotMessageSupport != null ? snapshotMessageSupport :
86 (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
90 public RaftActorContext getRaftActorContext() {
91 return super.getRaftActorContext();
94 public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
95 return snapshotMessageSupport;
98 public void waitForRecoveryComplete() {
100 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
101 } catch (InterruptedException e) {
102 throw new RuntimeException(e);
106 public void waitForInitializeBehaviorComplete() {
108 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
109 } catch (InterruptedException e) {
110 throw new RuntimeException(e);
115 public void waitUntilLeader() {
116 for (int i = 0; i < 10; i++) {
120 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
124 public List<Object> getState() {
129 protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
130 actorDelegate.applyState(clientActor, identifier, data);
131 LOG.info("{}: applyState called: {}", persistenceId(), data);
137 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
142 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
147 public void startLogRecoveryBatch(final int maxBatchSize) {
151 public void appendRecoveredLogEntry(final Payload data) {
156 public void applyCurrentLogRecoveryBatch() {
160 protected void onRecoveryComplete() {
161 actorDelegate.onRecoveryComplete();
162 recoveryComplete.countDown();
166 protected void initializeBehavior() {
167 super.initializeBehavior();
168 initializeBehaviorComplete.countDown();
172 public void applyRecoverySnapshot(final Snapshot.State newState) {
173 recoveryCohortDelegate.applyRecoverySnapshot(newState);
174 applySnapshotState(newState);
177 private void applySnapshotState(final Snapshot.State newState) {
178 if (newState instanceof MockSnapshotState mockState) {
180 state.addAll(mockState.getState());
185 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
186 LOG.info("{}: createSnapshot called", persistenceId());
187 snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
191 public void applySnapshot(final Snapshot.State newState) {
192 LOG.info("{}: applySnapshot called", persistenceId());
193 applySnapshotState(newState);
194 snapshotCohortDelegate.applySnapshot(newState);
198 public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
200 return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
201 } catch (IOException e) {
202 throw new RuntimeException("Error deserializing state", e);
207 protected void onStateChanged() {
208 actorDelegate.onStateChanged();
212 protected Optional<ActorRef> getRoleChangeNotifier() {
213 return Optional.ofNullable(roleChangeNotifier);
216 @Override public String persistenceId() {
220 protected void newBehavior(final RaftActorBehavior newBehavior) {
221 self().tell(newBehavior, ActorRef.noSender());
225 protected void handleCommand(final Object message) {
226 if (message instanceof RaftActorBehavior) {
227 super.changeCurrentBehavior((RaftActorBehavior)message);
229 super.handleCommand(message);
231 if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
232 snapshotCommitted.countDown();
238 protected void pauseLeader(final Runnable operation) {
239 if (pauseLeaderFunction != null) {
240 pauseLeaderFunction.apply(operation);
242 super.pauseLeader(operation);
246 public static List<Object> fromState(final Snapshot.State from) {
247 if (from instanceof MockSnapshotState mockState) {
248 return mockState.getState();
251 throw new IllegalStateException("Unexpected snapshot State: " + from);
254 public ReplicatedLog getReplicatedLog() {
255 return getRaftActorContext().getReplicatedLog();
259 public Snapshot getRestoreFromSnapshot() {
260 return restoreFromSnapshot;
263 public static Props props(final String id, final Map<String, String> peerAddresses, final ConfigParams config) {
264 return builder().id(id).peerAddresses(peerAddresses).config(config).props();
267 public static Props props(final String id, final Map<String, String> peerAddresses,
268 final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) {
269 return builder().id(id).peerAddresses(peerAddresses).config(config)
270 .dataPersistenceProvider(dataPersistenceProvider).props();
273 public static Builder builder() {
274 return new Builder();
277 public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
278 private Map<String, String> peerAddresses = Collections.emptyMap();
280 private ConfigParams config;
281 private DataPersistenceProvider dataPersistenceProvider;
282 private ActorRef roleChangeNotifier;
283 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
284 private Snapshot restoreFromSnapshot;
285 private Optional<Boolean> persistent = Optional.empty();
286 private final Class<A> actorClass;
287 private Function<Runnable, Void> pauseLeaderFunction;
288 private RaftActorSnapshotCohort snapshotCohort;
290 protected AbstractBuilder(final Class<A> actorClass) {
291 this.actorClass = actorClass;
294 @SuppressWarnings("unchecked")
299 public T id(final String newId) {
304 public T peerAddresses(final Map<String, String> newPeerAddresses) {
305 peerAddresses = newPeerAddresses;
309 public T config(final ConfigParams newConfig) {
314 public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) {
315 dataPersistenceProvider = newDataPersistenceProvider;
319 public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) {
320 roleChangeNotifier = newRoleChangeNotifier;
324 public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
325 snapshotMessageSupport = newSnapshotMessageSupport;
329 public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) {
330 restoreFromSnapshot = newRestoreFromSnapshot;
334 public T persistent(final Optional<Boolean> newPersistent) {
335 persistent = newPersistent;
339 public T pauseLeaderFunction(final Function<Runnable, Void> newPauseLeaderFunction) {
340 pauseLeaderFunction = newPauseLeaderFunction;
344 public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) {
345 snapshotCohort = newSnapshotCohort;
349 public Props props() {
350 return Props.create(actorClass, this);
354 public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
356 super(MockRaftActor.class);
360 public static class MockSnapshotState implements Snapshot.State {
361 private static final long serialVersionUID = 1L;
363 private final List<Object> state;
365 public MockSnapshotState(final List<Object> state) {
369 public List<Object> getState() {
374 public int hashCode() {
375 return Objects.hash(state);
379 public boolean equals(final Object obj) {
386 if (getClass() != obj.getClass()) {
389 MockSnapshotState other = (MockSnapshotState) obj;
390 if (!Objects.equals(state, other.state)) {
397 public String toString() {
398 return "MockSnapshotState [state=" + state + "]";