Bug 3020: Add leader version to LeaderStateChanged
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29
30 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
31
32     public static final short PAYLOAD_VERSION = 5;
33
34     final RaftActor actorDelegate;
35     final RaftActorRecoveryCohort recoveryCohortDelegate;
36     final RaftActorSnapshotCohort snapshotCohortDelegate;
37     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
38     private final List<Object> state;
39     private ActorRef roleChangeNotifier;
40     private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
41     private RaftActorRecoverySupport raftActorRecoverySupport;
42     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
43
44     public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
45         private static final long serialVersionUID = 1L;
46         private final Map<String, String> peerAddresses;
47         private final String id;
48         private final Optional<ConfigParams> config;
49         private final DataPersistenceProvider dataPersistenceProvider;
50         private final ActorRef roleChangeNotifier;
51         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
52
53         private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
54             Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
55             ActorRef roleChangeNotifier) {
56             this.peerAddresses = peerAddresses;
57             this.id = id;
58             this.config = config;
59             this.dataPersistenceProvider = dataPersistenceProvider;
60             this.roleChangeNotifier = roleChangeNotifier;
61         }
62
63         @Override
64         public MockRaftActor create() throws Exception {
65             MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
66                 dataPersistenceProvider);
67             mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
68             mockRaftActor.snapshotMessageSupport = snapshotMessageSupport;
69             return mockRaftActor;
70         }
71     }
72
73     public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
74                          DataPersistenceProvider dataPersistenceProvider) {
75         super(id, peerAddresses, config, PAYLOAD_VERSION);
76         state = new ArrayList<>();
77         this.actorDelegate = mock(RaftActor.class);
78         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
79         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
80         if(dataPersistenceProvider == null){
81             setPersistence(true);
82         } else {
83             setPersistence(dataPersistenceProvider);
84         }
85     }
86
87     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
88         raftActorRecoverySupport = support;
89     }
90
91     @Override
92     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
93         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
94     }
95
96     @Override
97     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
98         return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
99     }
100
101     public void waitForRecoveryComplete() {
102         try {
103             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
104         } catch (InterruptedException e) {
105             e.printStackTrace();
106         }
107     }
108
109     public void waitForInitializeBehaviorComplete() {
110         try {
111             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
112         } catch (InterruptedException e) {
113             e.printStackTrace();
114         }
115     }
116
117
118     public void waitUntilLeader(){
119         for(int i = 0;i < 10; i++){
120             if(isLeader()){
121                 break;
122             }
123             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
124         }
125     }
126
127     public List<Object> getState() {
128         return state;
129     }
130
131     public static Props props(final String id, final Map<String, String> peerAddresses,
132             Optional<ConfigParams> config){
133         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
134     }
135
136     public static Props props(final String id, final Map<String, String> peerAddresses,
137             Optional<ConfigParams> config, RaftActorSnapshotMessageSupport snapshotMessageSupport){
138         MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null);
139         creator.snapshotMessageSupport = snapshotMessageSupport;
140         return Props.create(creator);
141     }
142
143     public static Props props(final String id, final Map<String, String> peerAddresses,
144                               Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
145         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
146     }
147
148     public static Props props(final String id, final Map<String, String> peerAddresses,
149         Optional<ConfigParams> config, ActorRef roleChangeNotifier){
150         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
151     }
152
153     public static Props props(final String id, final Map<String, String> peerAddresses,
154                               Optional<ConfigParams> config, ActorRef roleChangeNotifier,
155                               DataPersistenceProvider dataPersistenceProvider){
156         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
157     }
158
159     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
160         actorDelegate.applyState(clientActor, identifier, data);
161         LOG.info("{}: applyState called: {}", persistenceId(), data);
162
163         state.add(data);
164     }
165
166     @Override
167     @Nonnull
168     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
169         return this;
170     }
171
172     @Override
173     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
174         return this;
175     }
176
177     @Override
178     public void startLogRecoveryBatch(int maxBatchSize) {
179     }
180
181     @Override
182     public void appendRecoveredLogEntry(Payload data) {
183         state.add(data);
184     }
185
186     @Override
187     public void applyCurrentLogRecoveryBatch() {
188     }
189
190     @Override
191     protected void onRecoveryComplete() {
192         actorDelegate.onRecoveryComplete();
193         recoveryComplete.countDown();
194     }
195
196     @Override
197     protected void initializeBehavior() {
198         super.initializeBehavior();
199         initializeBehaviorComplete.countDown();
200     }
201
202     @Override
203     public void applyRecoverySnapshot(byte[] bytes) {
204         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
205         try {
206             Object data = toObject(bytes);
207             if (data instanceof List) {
208                 state.addAll((List<?>) data);
209             }
210         } catch (Exception e) {
211             e.printStackTrace();
212         }
213     }
214
215     @Override
216     public void createSnapshot(ActorRef actorRef) {
217         LOG.info("{}: createSnapshot called", persistenceId());
218         snapshotCohortDelegate.createSnapshot(actorRef);
219     }
220
221     @Override
222     public void applySnapshot(byte [] snapshot) {
223         LOG.info("{}: applySnapshot called", persistenceId());
224         snapshotCohortDelegate.applySnapshot(snapshot);
225     }
226
227     @Override
228     protected void onStateChanged() {
229         actorDelegate.onStateChanged();
230     }
231
232     @Override
233     protected Optional<ActorRef> getRoleChangeNotifier() {
234         return Optional.fromNullable(roleChangeNotifier);
235     }
236
237     @Override public String persistenceId() {
238         return this.getId();
239     }
240
241     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
242         Object obj = null;
243         ByteArrayInputStream bis = null;
244         ObjectInputStream ois = null;
245         try {
246             bis = new ByteArrayInputStream(bs);
247             ois = new ObjectInputStream(bis);
248             obj = ois.readObject();
249         } finally {
250             if (bis != null) {
251                 bis.close();
252             }
253             if (ois != null) {
254                 ois.close();
255             }
256         }
257         return obj;
258     }
259
260     public ReplicatedLog getReplicatedLog(){
261         return this.getRaftActorContext().getReplicatedLog();
262     }
263 }