2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.base.Function;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.controller.cluster.DataPersistenceProvider;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31 import org.opendaylight.yangtools.concepts.Identifier;
33 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
34 public static final short PAYLOAD_VERSION = 5;
36 final RaftActor actorDelegate;
37 final RaftActorRecoveryCohort recoveryCohortDelegate;
38 volatile RaftActorSnapshotCohort snapshotCohortDelegate;
39 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
40 private final List<Object> state;
41 private final ActorRef roleChangeNotifier;
42 protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
43 private RaftActorRecoverySupport raftActorRecoverySupport;
44 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
45 private final byte[] restoreFromSnapshot;
46 final CountDownLatch snapshotCommitted = new CountDownLatch(1);
47 private final Function<Runnable, Void> pauseLeaderFunction;
49 protected MockRaftActor(AbstractBuilder<?, ?> builder) {
50 super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
51 Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
52 state = new ArrayList<>();
53 this.actorDelegate = mock(RaftActor.class);
54 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
56 this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
57 mock(RaftActorSnapshotCohort.class);
59 if(builder.dataPersistenceProvider == null){
60 setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
62 setPersistence(builder.dataPersistenceProvider);
65 roleChangeNotifier = builder.roleChangeNotifier;
66 snapshotMessageSupport = builder.snapshotMessageSupport;
67 restoreFromSnapshot = builder.restoreFromSnapshot;
68 pauseLeaderFunction = builder.pauseLeaderFunction;
71 public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
72 raftActorRecoverySupport = support;
76 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
77 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
81 protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
82 return snapshotMessageSupport != null ? snapshotMessageSupport :
83 (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
86 public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
87 return snapshotMessageSupport;
90 public void waitForRecoveryComplete() {
92 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
93 } catch (InterruptedException e) {
98 public void waitForInitializeBehaviorComplete() {
100 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
101 } catch (InterruptedException e) {
107 public void waitUntilLeader(){
108 for(int i = 0;i < 10; i++){
112 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
116 public List<Object> getState() {
121 protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
122 actorDelegate.applyState(clientActor, identifier, data);
123 LOG.info("{}: applyState called: {}", persistenceId(), data);
130 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
135 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
140 public void startLogRecoveryBatch(int maxBatchSize) {
144 public void appendRecoveredLogEntry(Payload data) {
149 public void applyCurrentLogRecoveryBatch() {
153 protected void onRecoveryComplete() {
154 actorDelegate.onRecoveryComplete();
155 recoveryComplete.countDown();
159 protected void initializeBehavior() {
160 super.initializeBehavior();
161 initializeBehaviorComplete.countDown();
165 public void applyRecoverySnapshot(byte[] bytes) {
166 recoveryCohortDelegate.applyRecoverySnapshot(bytes);
167 applySnapshotBytes(bytes);
170 private void applySnapshotBytes(byte[] bytes) {
172 Object data = toObject(bytes);
173 if (data instanceof List) {
175 state.addAll((List<?>) data);
177 } catch (Exception e) {
183 public void createSnapshot(ActorRef actorRef) {
184 LOG.info("{}: createSnapshot called", persistenceId());
185 snapshotCohortDelegate.createSnapshot(actorRef);
189 public void applySnapshot(byte [] snapshot) {
190 LOG.info("{}: applySnapshot called", persistenceId());
191 applySnapshotBytes(snapshot);
192 snapshotCohortDelegate.applySnapshot(snapshot);
196 protected void onStateChanged() {
197 actorDelegate.onStateChanged();
201 protected Optional<ActorRef> getRoleChangeNotifier() {
202 return Optional.fromNullable(roleChangeNotifier);
205 @Override public String persistenceId() {
209 protected void newBehavior(RaftActorBehavior newBehavior) {
210 self().tell(newBehavior, ActorRef.noSender());
214 protected void handleCommand(final Object message) {
215 if(message instanceof RaftActorBehavior) {
216 super.changeCurrentBehavior((RaftActorBehavior)message);
218 super.handleCommand(message);
220 if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
221 snapshotCommitted.countDown();
227 protected void pauseLeader(Runnable operation) {
228 if(pauseLeaderFunction != null) {
229 pauseLeaderFunction.apply(operation);
231 super.pauseLeader(operation);
235 public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
237 ByteArrayInputStream bis = null;
238 ObjectInputStream ois = null;
240 bis = new ByteArrayInputStream(bs);
241 ois = new ObjectInputStream(bis);
242 obj = ois.readObject();
254 public ReplicatedLog getReplicatedLog(){
255 return this.getRaftActorContext().getReplicatedLog();
259 public byte[] getRestoreFromSnapshot() {
260 return restoreFromSnapshot;
263 public static Props props(final String id, final Map<String, String> peerAddresses,
264 ConfigParams config){
265 return builder().id(id).peerAddresses(peerAddresses).config(config).props();
268 public static Props props(final String id, final Map<String, String> peerAddresses,
269 ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
270 return builder().id(id).peerAddresses(peerAddresses).config(config).
271 dataPersistenceProvider(dataPersistenceProvider).props();
274 public static Builder builder() {
275 return new Builder();
278 public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
279 private Map<String, String> peerAddresses = Collections.emptyMap();
281 private ConfigParams config;
282 private DataPersistenceProvider dataPersistenceProvider;
283 private ActorRef roleChangeNotifier;
284 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
285 private byte[] restoreFromSnapshot;
286 private Optional<Boolean> persistent = Optional.absent();
287 private final Class<A> actorClass;
288 private Function<Runnable, Void> pauseLeaderFunction;
289 private RaftActorSnapshotCohort snapshotCohort;
291 protected AbstractBuilder(Class<A> actorClass) {
292 this.actorClass = actorClass;
295 @SuppressWarnings("unchecked")
300 public T id(String id) {
305 public T peerAddresses(Map<String, String> peerAddresses) {
306 this.peerAddresses = peerAddresses;
310 public T config(ConfigParams config) {
311 this.config = config;
315 public T dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
316 this.dataPersistenceProvider = dataPersistenceProvider;
320 public T roleChangeNotifier(ActorRef roleChangeNotifier) {
321 this.roleChangeNotifier = roleChangeNotifier;
325 public T snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
326 this.snapshotMessageSupport = snapshotMessageSupport;
330 public T restoreFromSnapshot(byte[] restoreFromSnapshot) {
331 this.restoreFromSnapshot = restoreFromSnapshot;
335 public T persistent(Optional<Boolean> persistent) {
336 this.persistent = persistent;
340 public T pauseLeaderFunction(Function<Runnable, Void> pauseLeaderFunction) {
341 this.pauseLeaderFunction = pauseLeaderFunction;
345 public T snapshotCohort(RaftActorSnapshotCohort snapshotCohort) {
346 this.snapshotCohort = snapshotCohort;
350 public Props props() {
351 return Props.create(actorClass, this);
355 public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
357 super(MockRaftActor.class);