2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.any;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.mock;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import com.google.common.base.Function;
18 import com.google.common.base.Optional;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.io.ByteArrayInputStream;
21 import java.io.IOException;
22 import java.io.ObjectInputStream;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
34 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
35 public static final short PAYLOAD_VERSION = 5;
37 final RaftActor actorDelegate;
38 final RaftActorRecoveryCohort recoveryCohortDelegate;
39 volatile RaftActorSnapshotCohort snapshotCohortDelegate;
40 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
41 private final List<Object> state;
42 private final ActorRef roleChangeNotifier;
43 protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
44 private RaftActorRecoverySupport raftActorRecoverySupport;
45 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
46 private final byte[] restoreFromSnapshot;
47 final CountDownLatch snapshotCommitted = new CountDownLatch(1);
48 private final Function<Runnable, Void> pauseLeaderFunction;
50 protected MockRaftActor(AbstractBuilder<?, ?> builder) {
51 super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
52 state = new ArrayList<>();
53 this.actorDelegate = mock(RaftActor.class);
54 doNothing().when(this.actorDelegate).onRecoveryComplete();
55 doNothing().when(this.actorDelegate).onStateChanged();
56 doNothing().when(this.actorDelegate).applyState(any(ActorRef.class), any(String.class), any(Object.class));
58 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
59 doNothing().when(this.recoveryCohortDelegate).applyRecoverySnapshot(any(byte[].class));
61 this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
62 doNothing().when(this.snapshotCohortDelegate).applySnapshot(any(byte[].class));
63 doNothing().when(this.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
65 if(builder.dataPersistenceProvider == null){
66 setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
68 setPersistence(builder.dataPersistenceProvider);
71 roleChangeNotifier = builder.roleChangeNotifier;
72 snapshotMessageSupport = builder.snapshotMessageSupport;
73 restoreFromSnapshot = builder.restoreFromSnapshot;
74 pauseLeaderFunction = builder.pauseLeaderFunction;
77 public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
78 raftActorRecoverySupport = support;
82 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
83 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
87 protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
88 return snapshotMessageSupport != null ? snapshotMessageSupport :
89 (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
92 public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
93 return snapshotMessageSupport;
96 public void waitForRecoveryComplete() {
98 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
99 } catch (InterruptedException e) {
104 public void waitForInitializeBehaviorComplete() {
106 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
107 } catch (InterruptedException e) {
113 public void waitUntilLeader(){
114 for(int i = 0;i < 10; i++){
118 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
122 public List<Object> getState() {
127 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
128 actorDelegate.applyState(clientActor, identifier, data);
129 LOG.info("{}: applyState called: {}", persistenceId(), data);
136 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
141 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
146 public void startLogRecoveryBatch(int maxBatchSize) {
150 public void appendRecoveredLogEntry(Payload data) {
155 public void applyCurrentLogRecoveryBatch() {
159 protected void onRecoveryComplete() {
160 actorDelegate.onRecoveryComplete();
161 recoveryComplete.countDown();
165 protected void initializeBehavior() {
166 super.initializeBehavior();
167 initializeBehaviorComplete.countDown();
171 public void applyRecoverySnapshot(byte[] bytes) {
172 recoveryCohortDelegate.applyRecoverySnapshot(bytes);
173 applySnapshotBytes(bytes);
176 private void applySnapshotBytes(byte[] bytes) {
178 Object data = toObject(bytes);
179 if (data instanceof List) {
180 state.addAll((List<?>) data);
182 } catch (Exception e) {
188 public void createSnapshot(ActorRef actorRef) {
189 LOG.info("{}: createSnapshot called", persistenceId());
190 snapshotCohortDelegate.createSnapshot(actorRef);
194 public void applySnapshot(byte [] snapshot) {
195 LOG.info("{}: applySnapshot called", persistenceId());
196 applySnapshotBytes(snapshot);
197 snapshotCohortDelegate.applySnapshot(snapshot);
201 protected void onStateChanged() {
202 actorDelegate.onStateChanged();
206 protected Optional<ActorRef> getRoleChangeNotifier() {
207 return Optional.fromNullable(roleChangeNotifier);
210 @Override public String persistenceId() {
214 protected void newBehavior(RaftActorBehavior newBehavior) {
215 self().tell(newBehavior, ActorRef.noSender());
219 public void handleCommand(final Object message) {
220 if(message instanceof RaftActorBehavior) {
221 super.changeCurrentBehavior((RaftActorBehavior)message);
223 super.handleCommand(message);
225 if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
226 snapshotCommitted.countDown();
232 protected void pauseLeader(Runnable operation) {
233 if(pauseLeaderFunction != null) {
234 pauseLeaderFunction.apply(operation);
236 super.pauseLeader(operation);
240 public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
242 ByteArrayInputStream bis = null;
243 ObjectInputStream ois = null;
245 bis = new ByteArrayInputStream(bs);
246 ois = new ObjectInputStream(bis);
247 obj = ois.readObject();
259 public ReplicatedLog getReplicatedLog(){
260 return this.getRaftActorContext().getReplicatedLog();
264 public byte[] getRestoreFromSnapshot() {
265 return restoreFromSnapshot;
268 public static Props props(final String id, final Map<String, String> peerAddresses,
269 ConfigParams config){
270 return builder().id(id).peerAddresses(peerAddresses).config(config).props();
273 public static Props props(final String id, final Map<String, String> peerAddresses,
274 ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
275 return builder().id(id).peerAddresses(peerAddresses).config(config).
276 dataPersistenceProvider(dataPersistenceProvider).props();
279 public static Builder builder() {
280 return new Builder();
283 public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
284 private Map<String, String> peerAddresses = Collections.emptyMap();
286 private ConfigParams config;
287 private DataPersistenceProvider dataPersistenceProvider;
288 private ActorRef roleChangeNotifier;
289 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
290 private byte[] restoreFromSnapshot;
291 private Optional<Boolean> persistent = Optional.absent();
292 private final Class<A> actorClass;
293 private Function<Runnable, Void> pauseLeaderFunction;
295 protected AbstractBuilder(Class<A> actorClass) {
296 this.actorClass = actorClass;
299 @SuppressWarnings("unchecked")
304 public T id(String id) {
309 public T peerAddresses(Map<String, String> peerAddresses) {
310 this.peerAddresses = peerAddresses;
314 public T config(ConfigParams config) {
315 this.config = config;
319 public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
320 this.dataPersistenceProvider = dataPersistenceProvider;
324 public T roleChangeNotifier(ActorRef roleChangeNotifier) {
325 this.roleChangeNotifier = roleChangeNotifier;
329 public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
330 this.snapshotMessageSupport = snapshotMessageSupport;
334 public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
335 this.restoreFromSnapshot = restoreFromSnapshot;
339 public T persistent(Optional<Boolean> persistent) {
340 this.persistent = persistent;
344 public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
345 this.pauseLeaderFunction = pauseLeaderFunction;
349 public Props props() {
350 return Props.create(actorClass, this);
354 public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
356 super(MockRaftActor.class);