Bug 2948: Recovered log entries not applied after prior snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.io.ByteArrayInputStream;
19 import java.io.IOException;
20 import java.io.ObjectInputStream;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
29
30 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
31
32     final RaftActor actorDelegate;
33     final RaftActorRecoveryCohort recoveryCohortDelegate;
34     final RaftActorSnapshotCohort snapshotCohortDelegate;
35     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
36     private final List<Object> state;
37     private ActorRef roleChangeNotifier;
38     private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
39     private RaftActorRecoverySupport raftActorRecoverySupport;
40     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
41
42     public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
43         private static final long serialVersionUID = 1L;
44         private final Map<String, String> peerAddresses;
45         private final String id;
46         private final Optional<ConfigParams> config;
47         private final DataPersistenceProvider dataPersistenceProvider;
48         private final ActorRef roleChangeNotifier;
49         private RaftActorSnapshotMessageSupport snapshotMessageSupport;
50
51         private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
52             Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
53             ActorRef roleChangeNotifier) {
54             this.peerAddresses = peerAddresses;
55             this.id = id;
56             this.config = config;
57             this.dataPersistenceProvider = dataPersistenceProvider;
58             this.roleChangeNotifier = roleChangeNotifier;
59         }
60
61         @Override
62         public MockRaftActor create() throws Exception {
63             MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
64                 dataPersistenceProvider);
65             mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
66             mockRaftActor.snapshotMessageSupport = snapshotMessageSupport;
67             return mockRaftActor;
68         }
69     }
70
71     public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
72                          DataPersistenceProvider dataPersistenceProvider) {
73         super(id, peerAddresses, config);
74         state = new ArrayList<>();
75         this.actorDelegate = mock(RaftActor.class);
76         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
77         this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
78         if(dataPersistenceProvider == null){
79             setPersistence(true);
80         } else {
81             setPersistence(dataPersistenceProvider);
82         }
83     }
84
85     public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
86         raftActorRecoverySupport = support;
87     }
88
89     @Override
90     public RaftActorRecoverySupport newRaftActorRecoverySupport() {
91         return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
92     }
93
94     @Override
95     protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
96         return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
97     }
98
99     public void waitForRecoveryComplete() {
100         try {
101             assertEquals("Recovery complete", true, recoveryComplete.await(5,  TimeUnit.SECONDS));
102         } catch (InterruptedException e) {
103             e.printStackTrace();
104         }
105     }
106
107     public void waitForInitializeBehaviorComplete() {
108         try {
109             assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5,  TimeUnit.SECONDS));
110         } catch (InterruptedException e) {
111             e.printStackTrace();
112         }
113     }
114
115
116     public void waitUntilLeader(){
117         for(int i = 0;i < 10; i++){
118             if(isLeader()){
119                 break;
120             }
121             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
122         }
123     }
124
125     public List<Object> getState() {
126         return state;
127     }
128
129     public static Props props(final String id, final Map<String, String> peerAddresses,
130             Optional<ConfigParams> config){
131         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
132     }
133
134     public static Props props(final String id, final Map<String, String> peerAddresses,
135             Optional<ConfigParams> config, RaftActorSnapshotMessageSupport snapshotMessageSupport){
136         MockRaftActorCreator creator = new MockRaftActorCreator(peerAddresses, id, config, null, null);
137         creator.snapshotMessageSupport = snapshotMessageSupport;
138         return Props.create(creator);
139     }
140
141     public static Props props(final String id, final Map<String, String> peerAddresses,
142                               Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
143         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
144     }
145
146     public static Props props(final String id, final Map<String, String> peerAddresses,
147         Optional<ConfigParams> config, ActorRef roleChangeNotifier){
148         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
149     }
150
151     public static Props props(final String id, final Map<String, String> peerAddresses,
152                               Optional<ConfigParams> config, ActorRef roleChangeNotifier,
153                               DataPersistenceProvider dataPersistenceProvider){
154         return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
155     }
156
157     @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
158         actorDelegate.applyState(clientActor, identifier, data);
159         LOG.info("{}: applyState called: {}", persistenceId(), data);
160
161         state.add(data);
162     }
163
164     @Override
165     @Nonnull
166     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
167         return this;
168     }
169
170     @Override
171     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
172         return this;
173     }
174
175     @Override
176     public void startLogRecoveryBatch(int maxBatchSize) {
177     }
178
179     @Override
180     public void appendRecoveredLogEntry(Payload data) {
181         state.add(data);
182     }
183
184     @Override
185     public void applyCurrentLogRecoveryBatch() {
186     }
187
188     @Override
189     protected void onRecoveryComplete() {
190         actorDelegate.onRecoveryComplete();
191         recoveryComplete.countDown();
192     }
193
194     @Override
195     protected void initializeBehavior() {
196         super.initializeBehavior();
197         initializeBehaviorComplete.countDown();
198     }
199
200     @Override
201     public void applyRecoverySnapshot(byte[] bytes) {
202         recoveryCohortDelegate.applyRecoverySnapshot(bytes);
203         try {
204             Object data = toObject(bytes);
205             if (data instanceof List) {
206                 state.addAll((List<?>) data);
207             }
208         } catch (Exception e) {
209             e.printStackTrace();
210         }
211     }
212
213     @Override
214     public void createSnapshot(ActorRef actorRef) {
215         LOG.info("{}: createSnapshot called", persistenceId());
216         snapshotCohortDelegate.createSnapshot(actorRef);
217     }
218
219     @Override
220     public void applySnapshot(byte [] snapshot) {
221         LOG.info("{}: applySnapshot called", persistenceId());
222         snapshotCohortDelegate.applySnapshot(snapshot);
223     }
224
225     @Override
226     protected void onStateChanged() {
227         actorDelegate.onStateChanged();
228     }
229
230     @Override
231     protected Optional<ActorRef> getRoleChangeNotifier() {
232         return Optional.fromNullable(roleChangeNotifier);
233     }
234
235     @Override public String persistenceId() {
236         return this.getId();
237     }
238
239     public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
240         Object obj = null;
241         ByteArrayInputStream bis = null;
242         ObjectInputStream ois = null;
243         try {
244             bis = new ByteArrayInputStream(bs);
245             ois = new ObjectInputStream(bis);
246             obj = ois.readObject();
247         } finally {
248             if (bis != null) {
249                 bis.close();
250             }
251             if (ois != null) {
252                 ois.close();
253             }
254         }
255         return obj;
256     }
257
258     public ReplicatedLog getReplicatedLog(){
259         return this.getRaftActorContext().getReplicatedLog();
260     }
261 }