2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.Uninterruptibles;
17 import java.io.ByteArrayInputStream;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
29 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
33 public static final short PAYLOAD_VERSION = 5;
35 final RaftActor actorDelegate;
36 final RaftActorRecoveryCohort recoveryCohortDelegate;
37 volatile RaftActorSnapshotCohort snapshotCohortDelegate;
38 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
39 private final List<Object> state;
40 private final ActorRef roleChangeNotifier;
41 protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
42 private RaftActorRecoverySupport raftActorRecoverySupport;
43 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
44 private final byte[] restoreFromSnapshot;
45 final CountDownLatch snapshotCommitted = new CountDownLatch(1);
47 protected MockRaftActor(Builder builder) {
48 super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
49 state = new ArrayList<>();
50 this.actorDelegate = mock(RaftActor.class);
51 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
52 this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
54 if(builder.dataPersistenceProvider == null){
55 setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
57 setPersistence(builder.dataPersistenceProvider);
60 roleChangeNotifier = builder.roleChangeNotifier;
61 snapshotMessageSupport = builder.snapshotMessageSupport;
62 restoreFromSnapshot = builder.restoreFromSnapshot;
65 public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
66 raftActorRecoverySupport = support;
70 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
71 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
75 protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
76 return snapshotMessageSupport != null ? snapshotMessageSupport :
77 (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
80 public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
81 return snapshotMessageSupport;
84 public void waitForRecoveryComplete() {
86 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
87 } catch (InterruptedException e) {
92 public void waitForInitializeBehaviorComplete() {
94 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
95 } catch (InterruptedException e) {
101 public void waitUntilLeader(){
102 for(int i = 0;i < 10; i++){
106 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
110 public List<Object> getState() {
115 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
116 actorDelegate.applyState(clientActor, identifier, data);
117 LOG.info("{}: applyState called: {}", persistenceId(), data);
124 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
129 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
134 public void startLogRecoveryBatch(int maxBatchSize) {
138 public void appendRecoveredLogEntry(Payload data) {
143 public void applyCurrentLogRecoveryBatch() {
147 protected void onRecoveryComplete() {
148 actorDelegate.onRecoveryComplete();
149 recoveryComplete.countDown();
153 protected void initializeBehavior() {
154 super.initializeBehavior();
155 initializeBehaviorComplete.countDown();
159 public void applyRecoverySnapshot(byte[] bytes) {
160 recoveryCohortDelegate.applyRecoverySnapshot(bytes);
161 applySnapshotBytes(bytes);
164 private void applySnapshotBytes(byte[] bytes) {
166 Object data = toObject(bytes);
167 if (data instanceof List) {
168 state.addAll((List<?>) data);
170 } catch (Exception e) {
176 public void createSnapshot(ActorRef actorRef) {
177 LOG.info("{}: createSnapshot called", persistenceId());
178 snapshotCohortDelegate.createSnapshot(actorRef);
182 public void applySnapshot(byte [] snapshot) {
183 LOG.info("{}: applySnapshot called", persistenceId());
184 applySnapshotBytes(snapshot);
185 snapshotCohortDelegate.applySnapshot(snapshot);
189 protected void onStateChanged() {
190 actorDelegate.onStateChanged();
194 protected Optional<ActorRef> getRoleChangeNotifier() {
195 return Optional.fromNullable(roleChangeNotifier);
198 @Override public String persistenceId() {
202 protected void newBehavior(RaftActorBehavior newBehavior) {
203 self().tell(newBehavior, ActorRef.noSender());
207 public void handleCommand(final Object message) {
208 if(message instanceof RaftActorBehavior) {
209 super.changeCurrentBehavior((RaftActorBehavior)message);
211 super.handleCommand(message);
213 if(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
214 snapshotCommitted.countDown();
219 public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
221 ByteArrayInputStream bis = null;
222 ObjectInputStream ois = null;
224 bis = new ByteArrayInputStream(bs);
225 ois = new ObjectInputStream(bis);
226 obj = ois.readObject();
238 public ReplicatedLog getReplicatedLog(){
239 return this.getRaftActorContext().getReplicatedLog();
243 public byte[] getRestoreFromSnapshot() {
244 return restoreFromSnapshot;
247 public static Props props(final String id, final Map<String, String> peerAddresses,
248 ConfigParams config){
249 return builder().id(id).peerAddresses(peerAddresses).config(config).props();
252 public static Props props(final String id, final Map<String, String> peerAddresses,
253 ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
254 return builder().id(id).peerAddresses(peerAddresses).config(config).
255 dataPersistenceProvider(dataPersistenceProvider).props();
258 public static Builder builder() {
259 return new Builder();
262 public static class Builder {
263 private Map<String, String> peerAddresses = Collections.emptyMap();
265 private ConfigParams config;
266 private DataPersistenceProvider dataPersistenceProvider;
267 private ActorRef roleChangeNotifier;
268 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
269 private byte[] restoreFromSnapshot;
270 private Optional<Boolean> persistent = Optional.absent();
272 public Builder id(String id) {
277 public Builder peerAddresses(Map<String, String> peerAddresses) {
278 this.peerAddresses = peerAddresses;
282 public Builder config(ConfigParams config) {
283 this.config = config;
287 public Builder dataPersistenceProvider(DataPersistenceProvider dataPersistenceProvider) {
288 this.dataPersistenceProvider = dataPersistenceProvider;
292 public Builder roleChangeNotifier(ActorRef roleChangeNotifier) {
293 this.roleChangeNotifier = roleChangeNotifier;
297 public Builder snapshotMessageSupport(RaftActorSnapshotMessageSupport snapshotMessageSupport) {
298 this.snapshotMessageSupport = snapshotMessageSupport;
302 public Builder restoreFromSnapshot(byte[] restoreFromSnapshot) {
303 this.restoreFromSnapshot = restoreFromSnapshot;
307 public Builder persistent(Optional<Boolean> persistent) {
308 this.persistent = persistent;
312 public Props props() {
313 return Props.create(MockRaftActor.class, this);