f56638bc8223fa3593c92e67a59a4296068de1a2
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
29 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
30
31 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
32
33     public static final short PAYLOAD_VERSION = 5;
34
35     final RaftActor actorDelegate;
36     final RaftActorRecoveryCohort recoveryCohortDelegate;
37     volatile RaftActorSnapshotCohort snapshotCohortDelegate;
38     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
39     private final List<Object> state;
40     private ActorRef roleChangeNotifier;
41     protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
42     private RaftActorRecoverySupport raftActorRecoverySupport;
43     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
44
45     public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
46         private static final long serialVersionUID = 1L;
47         private final Map<String, String> peerAddresses;
48         private final String id;
49         private final Optional<ConfigParams> config;
50         private final DataPersistenceProvider dataPersistenceProvider;
51         private final ActorRef roleChangeNotifier;
52         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
53
54         private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
55             Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
56             ActorRef roleChangeNotifier) {
57             this.peerAddresses = peerAddresses;
58             this.id = id;
59             this.config = config;
60             this.dataPersistenceProvider = dataPersistenceProvider;
61             this.roleChangeNotifier = roleChangeNotifier;
62         }
63
64         @Override
65         public MockRaftActor create() throws Exception {
66             MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
67                 dataPersistenceProvider);
68             mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
69             mockRaftActor.snapshotMessageSupport = snapshotMessageSupport;
70             return mockRaftActor;
71         }
72     }
73
74     public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
75                          DataPersistenceProvider dataPersistenceProvider) {
76         super(id, peerAddresses, config, PAYLOAD_VERSION);
77         state = new ArrayList<>();
78         this.actorDelegate = mock(RaftActor.class);
79         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
80         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
81         if(dataPersistenceProvider == null){
82             setPersistence(true);
83         } else {
84             setPersistence(dataPersistenceProvider);
85         }
86     }
87
88     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
89         raftActorRecoverySupport = support;
90     }
91
92     @Override
93     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
94         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
95     }
96
97     @Override
98     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
99         return snapshotMessageSupport != null ? snapshotMessageSupport :
100             (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
101     }
102
103     public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
104         return snapshotMessageSupport;
105     }
106
107     public void waitForRecoveryComplete() {
108         try {
109             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
110         } catch (InterruptedException e) {
111             e.printStackTrace();
112         }
113     }
114
115     public void waitForInitializeBehaviorComplete() {
116         try {
117             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
118         } catch (InterruptedException e) {
119             e.printStackTrace();
120         }
121     }
122
123
124     public void waitUntilLeader(){
125         for(int i = 0;i < 10; i++){
126             if(isLeader()){
127                 break;
128             }
129             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
130         }
131     }
132
133     public List<Object> getState() {
134         return state;
135     }
136
137     public static Props props(final String id, final Map<String, String> peerAddresses,
138             Optional<ConfigParams> config){
139         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
140     }
141
142     public static Props props(final String id, final Map<String, String> peerAddresses,
143             Optional<ConfigParams> config, RaftActorSnapshotMessageSupport snapshotMessageSupport){
144         MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null);
145         creator.snapshotMessageSupport = snapshotMessageSupport;
146         return Props.create(creator);
147     }
148
149     public static Props props(final String id, final Map<String, String> peerAddresses,
150                               Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
151         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
152     }
153
154     public static Props props(final String id, final Map<String, String> peerAddresses,
155         Optional<ConfigParams> config, ActorRef roleChangeNotifier){
156         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
157     }
158
159     public static Props props(final String id, final Map<String, String> peerAddresses,
160                               Optional<ConfigParams> config, ActorRef roleChangeNotifier,
161                               DataPersistenceProvider dataPersistenceProvider){
162         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
163     }
164
165     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
166         actorDelegate.applyState(clientActor, identifier, data);
167         LOG.info("{}: applyState called: {}", persistenceId(), data);
168
169         state.add(data);
170     }
171
172     @Override
173     @Nonnull
174     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
175         return this;
176     }
177
178     @Override
179     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
180         return this;
181     }
182
183     @Override
184     public void startLogRecoveryBatch(int maxBatchSize) {
185     }
186
187     @Override
188     public void appendRecoveredLogEntry(Payload data) {
189         state.add(data);
190     }
191
192     @Override
193     public void applyCurrentLogRecoveryBatch() {
194     }
195
196     @Override
197     protected void onRecoveryComplete() {
198         actorDelegate.onRecoveryComplete();
199         recoveryComplete.countDown();
200     }
201
202     @Override
203     protected void initializeBehavior() {
204         super.initializeBehavior();
205         initializeBehaviorComplete.countDown();
206     }
207
208     @Override
209     public void applyRecoverySnapshot(byte[] bytes) {
210         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
211         applySnapshotBytes(bytes);
212     }
213
214     private void applySnapshotBytes(byte[] bytes) {
215         try {
216             Object data = toObject(bytes);
217             if (data instanceof List) {
218                 state.addAll((List<?>) data);
219             }
220         } catch (Exception e) {
221             e.printStackTrace();
222         }
223     }
224
225     @Override
226     public void createSnapshot(ActorRef actorRef) {
227         LOG.info("{}: createSnapshot called", persistenceId());
228         snapshotCohortDelegate.createSnapshot(actorRef);
229     }
230
231     @Override
232     public void applySnapshot(byte [] snapshot) {
233         LOG.info("{}: applySnapshot called", persistenceId());
234         snapshotCohortDelegate.applySnapshot(snapshot);
235         applySnapshotBytes(snapshot);
236     }
237
238     @Override
239     protected void onStateChanged() {
240         actorDelegate.onStateChanged();
241     }
242
243     @Override
244     protected Optional<ActorRef> getRoleChangeNotifier() {
245         return Optional.fromNullable(roleChangeNotifier);
246     }
247
248     @Override public String persistenceId() {
249         return this.getId();
250     }
251
252     protected void newBehavior(RaftActorBehavior newBehavior) {
253         self().tell(newBehavior, ActorRef.noSender());
254     }
255
256     @Override
257     public void handleCommand(final Object message) {
258         if(message instanceof RaftActorBehavior) {
259             super.changeCurrentBehavior((RaftActorBehavior)message);
260         } else {
261             super.handleCommand(message);
262         }
263     }
264
265     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
266         Object obj = null;
267         ByteArrayInputStream bis = null;
268         ObjectInputStream ois = null;
269         try {
270             bis = new ByteArrayInputStream(bs);
271             ois = new ObjectInputStream(bis);
272             obj = ois.readObject();
273         } finally {
274             if (bis != null) {
275                 bis.close();
276             }
277             if (ois != null) {
278                 ois.close();
279             }
280         }
281         return obj;
282     }
283
284     public ReplicatedLog getReplicatedLog(){
285         return this.getRaftActorContext().getReplicatedLog();
286     }
287 }