Add unit tests for RaftActorRecoverySupport
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29
30 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
31
32     final RaftActor actorDelegate;
33     final RaftActorRecoveryCohort recoveryCohortDelegate;
34     final RaftActorSnapshotCohort snapshotCohortDelegate;
35     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
36     private final List<Object> state;
37     private ActorRef roleChangeNotifier;
38     private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
39     private RaftActorRecoverySupport raftActorRecoverySupport;
40
41     public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
42         private static final long serialVersionUID = 1L;
43         private final Map<String, String> peerAddresses;
44         private final String id;
45         private final Optional<ConfigParams> config;
46         private final DataPersistenceProvider dataPersistenceProvider;
47         private final ActorRef roleChangeNotifier;
48
49         private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
50             Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
51             ActorRef roleChangeNotifier) {
52             this.peerAddresses = peerAddresses;
53             this.id = id;
54             this.config = config;
55             this.dataPersistenceProvider = dataPersistenceProvider;
56             this.roleChangeNotifier = roleChangeNotifier;
57         }
58
59         @Override
60         public MockRaftActor create() throws Exception {
61             MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
62                 dataPersistenceProvider);
63             mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
64             return mockRaftActor;
65         }
66     }
67
68     public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
69                          DataPersistenceProvider dataPersistenceProvider) {
70         super(id, peerAddresses, config);
71         state = new ArrayList<>();
72         this.actorDelegate = mock(RaftActor.class);
73         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
74         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
75         if(dataPersistenceProvider == null){
76             setPersistence(true);
77         } else {
78             setPersistence(dataPersistenceProvider);
79         }
80     }
81
82     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
83         raftActorRecoverySupport = support;
84     }
85
86     @Override
87     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
88         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
89     }
90
91     public void waitForRecoveryComplete() {
92         try {
93             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
94         } catch (InterruptedException e) {
95             e.printStackTrace();
96         }
97     }
98
99     public void waitForInitializeBehaviorComplete() {
100         try {
101             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
102         } catch (InterruptedException e) {
103             e.printStackTrace();
104         }
105     }
106
107
108     public void waitUntilLeader(){
109         for(int i = 0;i < 10; i++){
110             if(isLeader()){
111                 break;
112             }
113             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
114         }
115     }
116
117     public List<Object> getState() {
118         return state;
119     }
120
121     public static Props props(final String id, final Map<String, String> peerAddresses,
122             Optional<ConfigParams> config){
123         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
124     }
125
126     public static Props props(final String id, final Map<String, String> peerAddresses,
127                               Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
128         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
129     }
130
131     public static Props props(final String id, final Map<String, String> peerAddresses,
132         Optional<ConfigParams> config, ActorRef roleChangeNotifier){
133         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
134     }
135
136     public static Props props(final String id, final Map<String, String> peerAddresses,
137                               Optional<ConfigParams> config, ActorRef roleChangeNotifier,
138                               DataPersistenceProvider dataPersistenceProvider){
139         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
140     }
141
142
143     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
144         actorDelegate.applyState(clientActor, identifier, data);
145         LOG.info("{}: applyState called", persistenceId());
146     }
147
148     @Override
149     @Nonnull
150     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
151         return this;
152     }
153
154     @Override
155     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
156         return this;
157     }
158
159     @Override
160     public void startLogRecoveryBatch(int maxBatchSize) {
161     }
162
163     @Override
164     public void appendRecoveredLogEntry(Payload data) {
165         state.add(data);
166     }
167
168     @Override
169     public void applyCurrentLogRecoveryBatch() {
170     }
171
172     @Override
173     protected void onRecoveryComplete() {
174         actorDelegate.onRecoveryComplete();
175         recoveryComplete.countDown();
176     }
177
178     @Override
179     protected void initializeBehavior() {
180         super.initializeBehavior();
181         initializeBehaviorComplete.countDown();
182     }
183
184     @Override
185     public void applyRecoverySnapshot(byte[] bytes) {
186         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
187         try {
188             Object data = toObject(bytes);
189             if (data instanceof List) {
190                 state.addAll((List<?>) data);
191             }
192         } catch (Exception e) {
193             e.printStackTrace();
194         }
195     }
196
197     @Override
198     public void createSnapshot(ActorRef actorRef) {
199         LOG.info("{}: createSnapshot called", persistenceId());
200         snapshotCohortDelegate.createSnapshot(actorRef);
201     }
202
203     @Override
204     public void applySnapshot(byte [] snapshot) {
205         LOG.info("{}: applySnapshot called", persistenceId());
206         snapshotCohortDelegate.applySnapshot(snapshot);
207     }
208
209     @Override
210     protected void onStateChanged() {
211         actorDelegate.onStateChanged();
212     }
213
214     @Override
215     protected Optional<ActorRef> getRoleChangeNotifier() {
216         return Optional.fromNullable(roleChangeNotifier);
217     }
218
219     @Override public String persistenceId() {
220         return this.getId();
221     }
222
223     private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
224         Object obj = null;
225         ByteArrayInputStream bis = null;
226         ObjectInputStream ois = null;
227         try {
228             bis = new ByteArrayInputStream(bs);
229             ois = new ObjectInputStream(bis);
230             obj = ois.readObject();
231         } finally {
232             if (bis != null) {
233                 bis.close();
234             }
235             if (ois != null) {
236                 ois.close();
237             }
238         }
239         return obj;
240     }
241
242     public ReplicatedLog getReplicatedLog(){
243         return this.getRaftActorContext().getReplicatedLog();
244     }
245 }