2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.base.Function;
17 import com.google.common.base.Optional;
18 import com.google.common.io.ByteSource;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.io.IOException;
21 import java.io.OutputStream;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import org.apache.commons.lang3.SerializationUtils;
29 import org.opendaylight.controller.cluster.DataPersistenceProvider;
30 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
31 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
33 import org.opendaylight.yangtools.concepts.Identifier;
35 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
36 public static final short PAYLOAD_VERSION = 5;
38 final RaftActor actorDelegate;
39 final RaftActorRecoveryCohort recoveryCohortDelegate;
40 volatile RaftActorSnapshotCohort snapshotCohortDelegate;
41 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
42 private final List<Object> state;
43 private final ActorRef roleChangeNotifier;
44 protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
45 private RaftActorRecoverySupport raftActorRecoverySupport;
46 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
47 private final Snapshot restoreFromSnapshot;
48 final CountDownLatch snapshotCommitted = new CountDownLatch(1);
49 private final Function<Runnable, Void> pauseLeaderFunction;
51 protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
52 super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
53 Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
54 state = Collections.synchronizedList(new ArrayList<>());
55 this.actorDelegate = mock(RaftActor.class);
56 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
58 this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
59 mock(RaftActorSnapshotCohort.class);
61 if (builder.dataPersistenceProvider == null) {
62 setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
64 setPersistence(builder.dataPersistenceProvider);
67 roleChangeNotifier = builder.roleChangeNotifier;
68 snapshotMessageSupport = builder.snapshotMessageSupport;
69 restoreFromSnapshot = builder.restoreFromSnapshot;
70 pauseLeaderFunction = builder.pauseLeaderFunction;
73 public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) {
74 raftActorRecoverySupport = support;
78 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
79 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
83 protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
84 return snapshotMessageSupport != null ? snapshotMessageSupport :
85 (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
89 public RaftActorContext getRaftActorContext() {
90 return super.getRaftActorContext();
93 public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
94 return snapshotMessageSupport;
97 public void waitForRecoveryComplete() {
99 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
100 } catch (InterruptedException e) {
101 throw new RuntimeException(e);
105 public void waitForInitializeBehaviorComplete() {
107 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
108 } catch (InterruptedException e) {
109 throw new RuntimeException(e);
114 public void waitUntilLeader() {
115 for (int i = 0; i < 10; i++) {
119 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
123 public List<Object> getState() {
128 protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
129 actorDelegate.applyState(clientActor, identifier, data);
130 LOG.info("{}: applyState called: {}", persistenceId(), data);
136 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
141 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
146 public void startLogRecoveryBatch(final int maxBatchSize) {
150 public void appendRecoveredLogEntry(final Payload data) {
155 public void applyCurrentLogRecoveryBatch() {
159 protected void onRecoveryComplete() {
160 actorDelegate.onRecoveryComplete();
161 recoveryComplete.countDown();
165 protected void initializeBehavior() {
166 super.initializeBehavior();
167 initializeBehaviorComplete.countDown();
171 public void applyRecoverySnapshot(final Snapshot.State newState) {
172 recoveryCohortDelegate.applyRecoverySnapshot(newState);
173 applySnapshotState(newState);
176 private void applySnapshotState(final Snapshot.State newState) {
177 if (newState instanceof MockSnapshotState) {
179 state.addAll(((MockSnapshotState)newState).getState());
184 public void createSnapshot(final ActorRef actorRef, final java.util.Optional<OutputStream> installSnapshotStream) {
185 LOG.info("{}: createSnapshot called", persistenceId());
186 snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
190 public void applySnapshot(final Snapshot.State newState) {
191 LOG.info("{}: applySnapshot called", persistenceId());
192 applySnapshotState(newState);
193 snapshotCohortDelegate.applySnapshot(newState);
197 public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
199 return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
200 } catch (IOException e) {
201 throw new RuntimeException("Error deserializing state", e);
206 protected void onStateChanged() {
207 actorDelegate.onStateChanged();
211 protected Optional<ActorRef> getRoleChangeNotifier() {
212 return Optional.fromNullable(roleChangeNotifier);
215 @Override public String persistenceId() {
219 protected void newBehavior(final RaftActorBehavior newBehavior) {
220 self().tell(newBehavior, ActorRef.noSender());
224 protected void handleCommand(final Object message) {
225 if (message instanceof RaftActorBehavior) {
226 super.changeCurrentBehavior((RaftActorBehavior)message);
228 super.handleCommand(message);
230 if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
231 snapshotCommitted.countDown();
237 protected void pauseLeader(final Runnable operation) {
238 if (pauseLeaderFunction != null) {
239 pauseLeaderFunction.apply(operation);
241 super.pauseLeader(operation);
245 public static List<Object> fromState(final Snapshot.State from) {
246 if (from instanceof MockSnapshotState) {
247 return ((MockSnapshotState)from).getState();
250 throw new IllegalStateException("Unexpected snapshot State: " + from);
253 public ReplicatedLog getReplicatedLog() {
254 return this.getRaftActorContext().getReplicatedLog();
258 public Snapshot getRestoreFromSnapshot() {
259 return restoreFromSnapshot;
262 public static Props props(final String id, final Map<String, String> peerAddresses, final ConfigParams config) {
263 return builder().id(id).peerAddresses(peerAddresses).config(config).props();
266 public static Props props(final String id, final Map<String, String> peerAddresses,
267 final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) {
268 return builder().id(id).peerAddresses(peerAddresses).config(config)
269 .dataPersistenceProvider(dataPersistenceProvider).props();
272 public static Builder builder() {
273 return new Builder();
276 public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
277 private Map<String, String> peerAddresses = Collections.emptyMap();
279 private ConfigParams config;
280 private DataPersistenceProvider dataPersistenceProvider;
281 private ActorRef roleChangeNotifier;
282 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
283 private Snapshot restoreFromSnapshot;
284 private Optional<Boolean> persistent = Optional.absent();
285 private final Class<A> actorClass;
286 private Function<Runnable, Void> pauseLeaderFunction;
287 private RaftActorSnapshotCohort snapshotCohort;
289 protected AbstractBuilder(final Class<A> actorClass) {
290 this.actorClass = actorClass;
293 @SuppressWarnings("unchecked")
298 public T id(final String newId) {
303 public T peerAddresses(final Map<String, String> newPeerAddresses) {
304 this.peerAddresses = newPeerAddresses;
308 public T config(final ConfigParams newConfig) {
309 this.config = newConfig;
313 public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) {
314 this.dataPersistenceProvider = newDataPersistenceProvider;
318 public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) {
319 this.roleChangeNotifier = newRoleChangeNotifier;
323 public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
324 this.snapshotMessageSupport = newSnapshotMessageSupport;
328 public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) {
329 this.restoreFromSnapshot = newRestoreFromSnapshot;
333 public T persistent(final Optional<Boolean> newPersistent) {
334 this.persistent = newPersistent;
338 public T pauseLeaderFunction(final Function<Runnable, Void> newPauseLeaderFunction) {
339 this.pauseLeaderFunction = newPauseLeaderFunction;
343 public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) {
344 this.snapshotCohort = newSnapshotCohort;
348 public Props props() {
349 return Props.create(actorClass, this);
353 public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
355 super(MockRaftActor.class);
359 public static class MockSnapshotState implements Snapshot.State {
360 private static final long serialVersionUID = 1L;
362 private final List<Object> state;
364 public MockSnapshotState(final List<Object> state) {
368 public List<Object> getState() {
373 public int hashCode() {
374 final int prime = 31;
376 result = prime * result + (state == null ? 0 : state.hashCode());
381 public boolean equals(final Object obj) {
388 if (getClass() != obj.getClass()) {
391 MockSnapshotState other = (MockSnapshotState) obj;
393 if (other.state != null) {
396 } else if (!state.equals(other.state)) {
403 public String toString() {
404 return "MockSnapshotState [state=" + state + "]";