2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.base.Function;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Throwables;
19 import com.google.common.io.ByteSource;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.apache.commons.lang3.SerializationUtils;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
34 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
35 import org.opendaylight.yangtools.concepts.Identifier;
37 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
38 public static final short PAYLOAD_VERSION = 5;
40 final RaftActor actorDelegate;
41 final RaftActorRecoveryCohort recoveryCohortDelegate;
42 volatile RaftActorSnapshotCohort snapshotCohortDelegate;
43 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
44 private final List<Object> state;
45 private final ActorRef roleChangeNotifier;
46 protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
47 private RaftActorRecoverySupport raftActorRecoverySupport;
48 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
49 private final byte[] restoreFromSnapshot;
50 final CountDownLatch snapshotCommitted = new CountDownLatch(1);
51 private final Function<Runnable, Void> pauseLeaderFunction;
53 protected MockRaftActor(AbstractBuilder<?, ?> builder) {
54 super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
55 Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
56 state = new ArrayList<>();
57 this.actorDelegate = mock(RaftActor.class);
58 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
60 this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
61 mock(RaftActorSnapshotCohort.class);
63 if (builder.dataPersistenceProvider == null) {
64 setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
66 setPersistence(builder.dataPersistenceProvider);
69 roleChangeNotifier = builder.roleChangeNotifier;
70 snapshotMessageSupport = builder.snapshotMessageSupport;
71 restoreFromSnapshot = builder.restoreFromSnapshot;
72 pauseLeaderFunction = builder.pauseLeaderFunction;
75 public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
76 raftActorRecoverySupport = support;
80 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
81 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
85 protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
86 return snapshotMessageSupport != null ? snapshotMessageSupport :
87 (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
91 public RaftActorContext getRaftActorContext() {
92 return super.getRaftActorContext();
95 public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
96 return snapshotMessageSupport;
99 public void waitForRecoveryComplete() {
101 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
102 } catch (InterruptedException e) {
103 Throwables.propagate(e);
107 public void waitForInitializeBehaviorComplete() {
109 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
110 } catch (InterruptedException e) {
111 Throwables.propagate(e);
116 public void waitUntilLeader() {
117 for (int i = 0; i < 10; i++) {
121 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
125 public List<Object> getState() {
130 protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
131 actorDelegate.applyState(clientActor, identifier, data);
132 LOG.info("{}: applyState called: {}", persistenceId(), data);
139 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
144 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
149 public void startLogRecoveryBatch(int maxBatchSize) {
153 public void appendRecoveredLogEntry(Payload data) {
158 public void applyCurrentLogRecoveryBatch() {
162 protected void onRecoveryComplete() {
163 actorDelegate.onRecoveryComplete();
164 recoveryComplete.countDown();
168 protected void initializeBehavior() {
169 super.initializeBehavior();
170 initializeBehaviorComplete.countDown();
174 public void applyRecoverySnapshot(Snapshot.State newState) {
175 recoveryCohortDelegate.applyRecoverySnapshot(newState);
176 applySnapshotState(newState);
179 private void applySnapshotState(Snapshot.State newState) {
180 if (newState instanceof MockSnapshotState) {
182 state.addAll(((MockSnapshotState)newState).getState());
187 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
188 LOG.info("{}: createSnapshot called", persistenceId());
189 snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
193 public void applySnapshot(Snapshot.State newState) {
194 LOG.info("{}: applySnapshot called", persistenceId());
195 applySnapshotState(newState);
196 snapshotCohortDelegate.applySnapshot(newState);
200 public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
202 return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
203 } catch (IOException e) {
204 throw new RuntimeException("Error deserializing state", e);
209 public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
210 return new MockSnapshotState(SerializationUtils.deserialize(from));
214 protected void onStateChanged() {
215 actorDelegate.onStateChanged();
219 protected Optional<ActorRef> getRoleChangeNotifier() {
220 return Optional.fromNullable(roleChangeNotifier);
223 @Override public String persistenceId() {
227 protected void newBehavior(RaftActorBehavior newBehavior) {
228 self().tell(newBehavior, ActorRef.noSender());
232 protected void handleCommand(final Object message) {
233 if (message instanceof RaftActorBehavior) {
234 super.changeCurrentBehavior((RaftActorBehavior)message);
236 super.handleCommand(message);
238 if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
239 snapshotCommitted.countDown();
245 protected void pauseLeader(Runnable operation) {
246 if (pauseLeaderFunction != null) {
247 pauseLeaderFunction.apply(operation);
249 super.pauseLeader(operation);
253 public static List<Object> fromState(Snapshot.State from) {
254 if (from instanceof MockSnapshotState) {
255 return ((MockSnapshotState)from).getState();
258 throw new IllegalStateException("Unexpected snapshot State: " + from);
261 public ReplicatedLog getReplicatedLog() {
262 return this.getRaftActorContext().getReplicatedLog();
266 public byte[] getRestoreFromSnapshot() {
267 return restoreFromSnapshot;
270 public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
271 return builder().id(id).peerAddresses(peerAddresses).config(config).props();
274 public static Props props(final String id, final Map<String, String> peerAddresses,
275 ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
276 return builder().id(id).peerAddresses(peerAddresses).config(config)
277 .dataPersistenceProvider(dataPersistenceProvider).props();
280 public static Builder builder() {
281 return new Builder();
284 public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
285 private Map<String, String> peerAddresses = Collections.emptyMap();
287 private ConfigParams config;
288 private DataPersistenceProvider dataPersistenceProvider;
289 private ActorRef roleChangeNotifier;
290 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
291 private byte[] restoreFromSnapshot;
292 private Optional<Boolean> persistent = Optional.absent();
293 private final Class<A> actorClass;
294 private Function<Runnable, Void> pauseLeaderFunction;
295 private RaftActorSnapshotCohort snapshotCohort;
297 protected AbstractBuilder(Class<A> actorClass) {
298 this.actorClass = actorClass;
301 @SuppressWarnings("unchecked")
306 public T id(String newId) {
311 public T peerAddresses(Map<String, String> newPeerAddresses) {
312 this.peerAddresses = newPeerAddresses;
316 public T config(ConfigParams newConfig) {
317 this.config = newConfig;
321 public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
322 this.dataPersistenceProvider = newDataPersistenceProvider;
326 public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
327 this.roleChangeNotifier = newRoleChangeNotifier;
331 public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
332 this.snapshotMessageSupport = newSnapshotMessageSupport;
336 public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
337 this.restoreFromSnapshot = newRestoreFromSnapshot;
341 public T persistent(Optional<Boolean> newPersistent) {
342 this.persistent = newPersistent;
346 public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
347 this.pauseLeaderFunction = newPauseLeaderFunction;
351 public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
352 this.snapshotCohort = newSnapshotCohort;
356 public Props props() {
357 return Props.create(actorClass, this);
361 public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
363 super(MockRaftActor.class);
367 public static class MockSnapshotState implements Snapshot.State {
368 private static final long serialVersionUID = 1L;
370 private final List<Object> state;
372 public MockSnapshotState(List<Object> state) {
376 public List<Object> getState() {
381 public int hashCode() {
382 final int prime = 31;
384 result = prime * result + (state == null ? 0 : state.hashCode());
389 public boolean equals(Object obj) {
396 if (getClass() != obj.getClass()) {
399 MockSnapshotState other = (MockSnapshotState) obj;
401 if (other.state != null) {
404 } else if (!state.equals(other.state)) {
411 public String toString() {
412 return "MockSnapshotState [state=" + state + "]";