2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
29 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
31 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
33 public static final short PAYLOAD_VERSION = 5;
35 final RaftActor actorDelegate;
36 final RaftActorRecoveryCohort recoveryCohortDelegate;
37 volatile RaftActorSnapshotCohort snapshotCohortDelegate;
38 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
39 private final List<Object> state;
40 private ActorRef roleChangeNotifier;
41 protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
42 private RaftActorRecoverySupport raftActorRecoverySupport;
43 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
45 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
46 private static final long serialVersionUID = 1L;
47 private final Map<String, String> peerAddresses;
48 private final String id;
49 private final Optional<ConfigParams> config;
50 private final DataPersistenceProvider dataPersistenceProvider;
51 private final ActorRef roleChangeNotifier;
52 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
54 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
55 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
56 ActorRef roleChangeNotifier) {
57 this.peerAddresses = peerAddresses;
60 this.dataPersistenceProvider = dataPersistenceProvider;
61 this.roleChangeNotifier = roleChangeNotifier;
65 public MockRaftActor create() throws Exception {
66 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
67 dataPersistenceProvider);
68 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
69 mockRaftActor.snapshotMessageSupport = snapshotMessageSupport;
74 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
75 DataPersistenceProvider dataPersistenceProvider) {
76 super(id, peerAddresses, config, PAYLOAD_VERSION);
77 state = new ArrayList<>();
78 this.actorDelegate = mock(RaftActor.class);
79 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
80 this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
81 if(dataPersistenceProvider == null){
84 setPersistence(dataPersistenceProvider);
88 public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
89 raftActorRecoverySupport = support;
93 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
94 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
98 protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
99 return snapshotMessageSupport != null ? snapshotMessageSupport :
100 (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
103 public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
104 return snapshotMessageSupport;
107 public void waitForRecoveryComplete() {
109 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
110 } catch (InterruptedException e) {
115 public void waitForInitializeBehaviorComplete() {
117 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
118 } catch (InterruptedException e) {
124 public void waitUntilLeader(){
125 for(int i = 0;i < 10; i++){
129 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
133 public List<Object> getState() {
137 public static Props props(final String id, final Map<String, String> peerAddresses,
138 Optional<ConfigParams> config){
139 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
142 public static Props props(final String id, final Map<String, String> peerAddresses,
143 Optional<ConfigParams> config, RaftActorSnapshotMessageSupport snapshotMessageSupport){
144 MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null);
145 creator.snapshotMessageSupport = snapshotMessageSupport;
146 return Props.create(creator);
149 public static Props props(final String id, final Map<String, String> peerAddresses,
150 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
151 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
154 public static Props props(final String id, final Map<String, String> peerAddresses,
155 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
156 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
159 public static Props props(final String id, final Map<String, String> peerAddresses,
160 Optional<ConfigParams> config, ActorRef roleChangeNotifier,
161 DataPersistenceProvider dataPersistenceProvider){
162 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
165 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
166 actorDelegate.applyState(clientActor, identifier, data);
167 LOG.info("{}: applyState called: {}", persistenceId(), data);
174 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
179 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
184 public void startLogRecoveryBatch(int maxBatchSize) {
188 public void appendRecoveredLogEntry(Payload data) {
193 public void applyCurrentLogRecoveryBatch() {
197 protected void onRecoveryComplete() {
198 actorDelegate.onRecoveryComplete();
199 recoveryComplete.countDown();
203 protected void initializeBehavior() {
204 super.initializeBehavior();
205 initializeBehaviorComplete.countDown();
209 public void applyRecoverySnapshot(byte[] bytes) {
210 recoveryCohortDelegate.applyRecoverySnapshot(bytes);
211 applySnapshotBytes(bytes);
214 private void applySnapshotBytes(byte[] bytes) {
216 Object data = toObject(bytes);
217 if (data instanceof List) {
218 state.addAll((List<?>) data);
220 } catch (Exception e) {
226 public void createSnapshot(ActorRef actorRef) {
227 LOG.info("{}: createSnapshot called", persistenceId());
228 snapshotCohortDelegate.createSnapshot(actorRef);
232 public void applySnapshot(byte [] snapshot) {
233 LOG.info("{}: applySnapshot called", persistenceId());
234 snapshotCohortDelegate.applySnapshot(snapshot);
235 applySnapshotBytes(snapshot);
239 protected void onStateChanged() {
240 actorDelegate.onStateChanged();
244 protected Optional<ActorRef> getRoleChangeNotifier() {
245 return Optional.fromNullable(roleChangeNotifier);
248 @Override public String persistenceId() {
252 protected void newBehavior(RaftActorBehavior newBehavior) {
253 self().tell(newBehavior, ActorRef.noSender());
257 public void handleCommand(final Object message) {
258 if(message instanceof RaftActorBehavior) {
259 super.changeCurrentBehavior((RaftActorBehavior)message);
261 super.handleCommand(message);
265 public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
267 ByteArrayInputStream bis = null;
268 ObjectInputStream ois = null;
270 bis = new ByteArrayInputStream(bs);
271 ois = new ObjectInputStream(bis);
272 obj = ois.readObject();
284 public ReplicatedLog getReplicatedLog(){
285 return this.getRaftActorContext().getReplicatedLog();