bb39ed98ba3c9f835371826b090bc044b718504e
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
29 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
30
31 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
32
33     public static final short PAYLOAD_VERSION = 5;
34
35     final RaftActor actorDelegate;
36     final RaftActorRecoveryCohort recoveryCohortDelegate;
37     final RaftActorSnapshotCohort snapshotCohortDelegate;
38     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
39     private final List<Object> state;
40     private ActorRef roleChangeNotifier;
41     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
42     private RaftActorRecoverySupport raftActorRecoverySupport;
43     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
44
45     public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
46         private static final long serialVersionUID = 1L;
47         private final Map<String, String> peerAddresses;
48         private final String id;
49         private final Optional<ConfigParams> config;
50         private final DataPersistenceProvider dataPersistenceProvider;
51         private final ActorRef roleChangeNotifier;
52         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
53
54         private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
55             Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
56             ActorRef roleChangeNotifier) {
57             this.peerAddresses = peerAddresses;
58             this.id = id;
59             this.config = config;
60             this.dataPersistenceProvider = dataPersistenceProvider;
61             this.roleChangeNotifier = roleChangeNotifier;
62         }
63
64         @Override
65         public MockRaftActor create() throws Exception {
66             MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
67                 dataPersistenceProvider);
68             mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
69             mockRaftActor.snapshotMessageSupport = snapshotMessageSupport;
70             return mockRaftActor;
71         }
72     }
73
74     public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
75                          DataPersistenceProvider dataPersistenceProvider) {
76         super(id, peerAddresses, config, PAYLOAD_VERSION);
77         state = new ArrayList<>();
78         this.actorDelegate = mock(RaftActor.class);
79         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
80         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
81         if(dataPersistenceProvider == null){
82             setPersistence(true);
83         } else {
84             setPersistence(dataPersistenceProvider);
85         }
86     }
87
88     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
89         raftActorRecoverySupport = support;
90     }
91
92     @Override
93     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
94         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
95     }
96
97     @Override
98     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
99         return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
100     }
101
102     public void waitForRecoveryComplete() {
103         try {
104             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
105         } catch (InterruptedException e) {
106             e.printStackTrace();
107         }
108     }
109
110     public void waitForInitializeBehaviorComplete() {
111         try {
112             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
113         } catch (InterruptedException e) {
114             e.printStackTrace();
115         }
116     }
117
118
119     public void waitUntilLeader(){
120         for(int i = 0;i < 10; i++){
121             if(isLeader()){
122                 break;
123             }
124             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
125         }
126     }
127
128     public List<Object> getState() {
129         return state;
130     }
131
132     public static Props props(final String id, final Map<String, String> peerAddresses,
133             Optional<ConfigParams> config){
134         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
135     }
136
137     public static Props props(final String id, final Map<String, String> peerAddresses,
138             Optional<ConfigParams> config, RaftActorSnapshotMessageSupport snapshotMessageSupport){
139         MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null);
140         creator.snapshotMessageSupport = snapshotMessageSupport;
141         return Props.create(creator);
142     }
143
144     public static Props props(final String id, final Map<String, String> peerAddresses,
145                               Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
146         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
147     }
148
149     public static Props props(final String id, final Map<String, String> peerAddresses,
150         Optional<ConfigParams> config, ActorRef roleChangeNotifier){
151         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
152     }
153
154     public static Props props(final String id, final Map<String, String> peerAddresses,
155                               Optional<ConfigParams> config, ActorRef roleChangeNotifier,
156                               DataPersistenceProvider dataPersistenceProvider){
157         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
158     }
159
160     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
161         actorDelegate.applyState(clientActor, identifier, data);
162         LOG.info("{}: applyState called: {}", persistenceId(), data);
163
164         state.add(data);
165     }
166
167     @Override
168     @Nonnull
169     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
170         return this;
171     }
172
173     @Override
174     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
175         return this;
176     }
177
178     @Override
179     public void startLogRecoveryBatch(int maxBatchSize) {
180     }
181
182     @Override
183     public void appendRecoveredLogEntry(Payload data) {
184         state.add(data);
185     }
186
187     @Override
188     public void applyCurrentLogRecoveryBatch() {
189     }
190
191     @Override
192     protected void onRecoveryComplete() {
193         actorDelegate.onRecoveryComplete();
194         recoveryComplete.countDown();
195     }
196
197     @Override
198     protected void initializeBehavior() {
199         super.initializeBehavior();
200         initializeBehaviorComplete.countDown();
201     }
202
203     @Override
204     public void applyRecoverySnapshot(byte[] bytes) {
205         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
206         applySnapshotBytes(bytes);
207     }
208
209     private void applySnapshotBytes(byte[] bytes) {
210         try {
211             Object data = toObject(bytes);
212             if (data instanceof List) {
213                 state.addAll((List<?>) data);
214             }
215         } catch (Exception e) {
216             e.printStackTrace();
217         }
218     }
219
220     @Override
221     public void createSnapshot(ActorRef actorRef) {
222         LOG.info("{}: createSnapshot called", persistenceId());
223         snapshotCohortDelegate.createSnapshot(actorRef);
224     }
225
226     @Override
227     public void applySnapshot(byte [] snapshot) {
228         LOG.info("{}: applySnapshot called", persistenceId());
229         snapshotCohortDelegate.applySnapshot(snapshot);
230         applySnapshotBytes(snapshot);
231     }
232
233     @Override
234     protected void onStateChanged() {
235         actorDelegate.onStateChanged();
236     }
237
238     @Override
239     protected Optional<ActorRef> getRoleChangeNotifier() {
240         return Optional.fromNullable(roleChangeNotifier);
241     }
242
243     @Override public String persistenceId() {
244         return this.getId();
245     }
246
247     protected void newBehavior(RaftActorBehavior newBehavior) {
248         self().tell(newBehavior, ActorRef.noSender());
249     }
250
251     @Override
252     public void handleCommand(final Object message) {
253         if(message instanceof RaftActorBehavior) {
254             super.changeCurrentBehavior((RaftActorBehavior)message);
255         } else {
256             super.handleCommand(message);
257         }
258     }
259
260     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
261         Object obj = null;
262         ByteArrayInputStream bis = null;
263         ObjectInputStream ois = null;
264         try {
265             bis = new ByteArrayInputStream(bs);
266             ois = new ObjectInputStream(bis);
267             obj = ois.readObject();
268         } finally {
269             if (bis != null) {
270                 bis.close();
271             }
272             if (ois != null) {
273                 ois.close();
274             }
275         }
276         return obj;
277     }
278
279     public ReplicatedLog getReplicatedLog(){
280         return this.getRaftActorContext().getReplicatedLog();
281     }
282 }