2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
30 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
32 final RaftActor actorDelegate;
33 final RaftActorRecoveryCohort recoveryCohortDelegate;
34 final RaftActorSnapshotCohort snapshotCohortDelegate;
35 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
36 private final List<Object> state;
37 private ActorRef roleChangeNotifier;
38 private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
39 private RaftActorRecoverySupport raftActorRecoverySupport;
41 public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
42 private static final long serialVersionUID = 1L;
43 private final Map<String, String> peerAddresses;
44 private final String id;
45 private final Optional<ConfigParams> config;
46 private final DataPersistenceProvider dataPersistenceProvider;
47 private final ActorRef roleChangeNotifier;
49 private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
50 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
51 ActorRef roleChangeNotifier) {
52 this.peerAddresses = peerAddresses;
55 this.dataPersistenceProvider = dataPersistenceProvider;
56 this.roleChangeNotifier = roleChangeNotifier;
60 public MockRaftActor create() throws Exception {
61 MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
62 dataPersistenceProvider);
63 mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
68 public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
69 DataPersistenceProvider dataPersistenceProvider) {
70 super(id, peerAddresses, config);
71 state = new ArrayList<>();
72 this.actorDelegate = mock(RaftActor.class);
73 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
74 this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
75 if(dataPersistenceProvider == null){
78 setPersistence(dataPersistenceProvider);
82 public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
83 raftActorRecoverySupport = support;
87 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
88 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
91 public void waitForRecoveryComplete() {
93 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
94 } catch (InterruptedException e) {
99 public void waitForInitializeBehaviorComplete() {
101 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
102 } catch (InterruptedException e) {
108 public void waitUntilLeader(){
109 for(int i = 0;i < 10; i++){
113 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
117 public List<Object> getState() {
121 public static Props props(final String id, final Map<String, String> peerAddresses,
122 Optional<ConfigParams> config){
123 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
126 public static Props props(final String id, final Map<String, String> peerAddresses,
127 Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
128 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
131 public static Props props(final String id, final Map<String, String> peerAddresses,
132 Optional<ConfigParams> config, ActorRef roleChangeNotifier){
133 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
136 public static Props props(final String id, final Map<String, String> peerAddresses,
137 Optional<ConfigParams> config, ActorRef roleChangeNotifier,
138 DataPersistenceProvider dataPersistenceProvider){
139 return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
143 @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
144 actorDelegate.applyState(clientActor, identifier, data);
145 LOG.info("{}: applyState called", persistenceId());
150 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
155 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
160 public void startLogRecoveryBatch(int maxBatchSize) {
164 public void appendRecoveredLogEntry(Payload data) {
169 public void applyCurrentLogRecoveryBatch() {
173 protected void onRecoveryComplete() {
174 actorDelegate.onRecoveryComplete();
175 recoveryComplete.countDown();
179 protected void initializeBehavior() {
180 super.initializeBehavior();
181 initializeBehaviorComplete.countDown();
185 public void applyRecoverySnapshot(byte[] bytes) {
186 recoveryCohortDelegate.applyRecoverySnapshot(bytes);
188 Object data = toObject(bytes);
189 if (data instanceof List) {
190 state.addAll((List<?>) data);
192 } catch (Exception e) {
198 public void createSnapshot(ActorRef actorRef) {
199 LOG.info("{}: createSnapshot called", persistenceId());
200 snapshotCohortDelegate.createSnapshot(actorRef);
204 public void applySnapshot(byte [] snapshot) {
205 LOG.info("{}: applySnapshot called", persistenceId());
206 snapshotCohortDelegate.applySnapshot(snapshot);
210 protected void onStateChanged() {
211 actorDelegate.onStateChanged();
215 protected Optional<ActorRef> getRoleChangeNotifier() {
216 return Optional.fromNullable(roleChangeNotifier);
219 @Override public String persistenceId() {
223 private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
225 ByteArrayInputStream bis = null;
226 ObjectInputStream ois = null;
228 bis = new ByteArrayInputStream(bs);
229 ois = new ObjectInputStream(bis);
230 obj = ois.readObject();
242 public ReplicatedLog getReplicatedLog(){
243 return this.getRaftActorContext().getReplicatedLog();