BUG 2185 : Introduce the SwitchBehavior message
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.anyObject;
17 import static org.mockito.Matchers.eq;
18 import static org.mockito.Matchers.same;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.times;
22 import static org.mockito.Mockito.verify;
23 import akka.actor.ActorRef;
24 import akka.actor.PoisonPill;
25 import akka.actor.Props;
26 import akka.actor.Terminated;
27 import akka.japi.Procedure;
28 import akka.persistence.RecoveryCompleted;
29 import akka.persistence.SaveSnapshotFailure;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.persistence.SnapshotMetadata;
32 import akka.persistence.SnapshotOffer;
33 import akka.testkit.JavaTestKit;
34 import akka.testkit.TestActorRef;
35 import com.google.common.base.Optional;
36 import com.google.common.collect.ImmutableMap;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import com.google.protobuf.ByteString;
39 import java.io.ByteArrayOutputStream;
40 import java.io.ObjectOutputStream;
41 import java.util.ArrayList;
42 import java.util.Arrays;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.List;
46 import java.util.Map;
47 import java.util.concurrent.TimeUnit;
48 import org.junit.After;
49 import org.junit.Before;
50 import org.junit.Test;
51 import org.opendaylight.controller.cluster.DataPersistenceProvider;
52 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
53 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
54 import org.opendaylight.controller.cluster.notifications.RoleChanged;
55 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
58 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
59 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
60 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
61 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
62 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
63 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
64 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
65 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
66 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
67 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
68 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
69 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
70 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
71 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
72 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75 import scala.concurrent.duration.FiniteDuration;
76
77 public class RaftActorTest extends AbstractActorTest {
78
79     static final Logger TEST_LOG = LoggerFactory.getLogger(RaftActorTest.class);
80
81     private TestActorFactory factory;
82
83     @Before
84     public void setUp(){
85         factory = new TestActorFactory(getSystem());
86     }
87
88     @After
89     public void tearDown() throws Exception {
90         factory.close();
91         InMemoryJournal.clear();
92         InMemorySnapshotStore.clear();
93     }
94
95     @Test
96     public void testConstruction() {
97         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
98     }
99
100     @Test
101     public void testFindLeaderWhenLeaderIsSelf(){
102         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
103         kit.waitUntilLeader();
104     }
105
106     @Test
107     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
108         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
109
110         new JavaTestKit(getSystem()) {{
111             String persistenceId = factory.generateActorId("follower-");
112
113             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
114
115             // Set the heartbeat interval high to essentially disable election otherwise the test
116             // may fail if the actor is switched to Leader and the commitIndex is set to the last
117             // log entry.
118             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
119
120             ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
121             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
122                     peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
123
124             watch(followerActor);
125
126             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
127             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
128                     new MockRaftActorContext.MockPayload("E"));
129             snapshotUnappliedEntries.add(entry1);
130
131             int lastAppliedDuringSnapshotCapture = 3;
132             int lastIndexDuringSnapshotCapture = 4;
133
134             // 4 messages as part of snapshot, which are applied to state
135             ByteString snapshotBytes = fromObject(Arrays.asList(
136                     new MockRaftActorContext.MockPayload("A"),
137                     new MockRaftActorContext.MockPayload("B"),
138                     new MockRaftActorContext.MockPayload("C"),
139                     new MockRaftActorContext.MockPayload("D")));
140
141             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
142                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
143                     lastAppliedDuringSnapshotCapture, 1);
144             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
145
146             // add more entries after snapshot is taken
147             List<ReplicatedLogEntry> entries = new ArrayList<>();
148             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
149                     new MockRaftActorContext.MockPayload("F", 2));
150             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
151                     new MockRaftActorContext.MockPayload("G", 3));
152             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
153                     new MockRaftActorContext.MockPayload("H", 4));
154             entries.add(entry2);
155             entries.add(entry3);
156             entries.add(entry4);
157
158             int lastAppliedToState = 5;
159             int lastIndex = 7;
160
161             InMemoryJournal.addEntry(persistenceId, 5, entry2);
162             // 2 entries are applied to state besides the 4 entries in snapshot
163             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
164             InMemoryJournal.addEntry(persistenceId, 7, entry3);
165             InMemoryJournal.addEntry(persistenceId, 8, entry4);
166
167             // kill the actor
168             followerActor.tell(PoisonPill.getInstance(), null);
169             expectMsgClass(duration("5 seconds"), Terminated.class);
170
171             unwatch(followerActor);
172
173             //reinstate the actor
174             TestActorRef<MockRaftActor> ref = factory.createTestActor(
175                     MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
176
177             MockRaftActor mockRaftActor = ref.underlyingActor();
178
179             mockRaftActor.waitForRecoveryComplete();
180
181             RaftActorContext context = mockRaftActor.getRaftActorContext();
182             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
183                     context.getReplicatedLog().size());
184             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
185             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
186             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
187             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
188             assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
189
190             mockRaftActor.waitForInitializeBehaviorComplete();
191
192             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
193         }};
194
195         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled ending");
196     }
197
198     @Test
199     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
200         new JavaTestKit(getSystem()) {{
201             String persistenceId = factory.generateActorId("follower-");
202
203             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
204
205             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
206
207             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
208                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
209                     Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
210
211             MockRaftActor mockRaftActor = ref.underlyingActor();
212
213             mockRaftActor.waitForRecoveryComplete();
214
215             mockRaftActor.waitForInitializeBehaviorComplete();
216
217             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
218         }};
219     }
220
221     @Test
222     public void testRaftActorForwardsToRaftActorRecoverySupport() {
223         String persistenceId = factory.generateActorId("leader-");
224
225         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
226
227         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
228
229         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
230                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
231
232         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
233
234         // Wait for akka's recovery to complete so it doesn't interfere.
235         mockRaftActor.waitForRecoveryComplete();
236
237         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
238         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
239
240         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
241         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
242         mockRaftActor.handleRecover(snapshotOffer);
243
244         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
245                 1, new MockRaftActorContext.MockPayload("1", 5));
246         mockRaftActor.handleRecover(logEntry);
247
248         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
249         mockRaftActor.handleRecover(applyJournalEntries);
250
251         ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
252         mockRaftActor.handleRecover(applyLogEntries);
253
254         DeleteEntries deleteEntries = new DeleteEntries(1);
255         mockRaftActor.handleRecover(deleteEntries);
256
257         org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
258                 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
259         mockRaftActor.handleRecover(deprecatedDeleteEntries);
260
261         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
262         mockRaftActor.handleRecover(updateElectionTerm);
263
264         org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
265                 new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
266         mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
267
268         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
269         verify(mockSupport).handleRecoveryMessage(same(logEntry));
270         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
271         verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
272         verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
273         verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries));
274         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
275         verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm));
276     }
277
278     @Test
279     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
280         String persistenceId = factory.generateActorId("leader-");
281
282         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
283
284         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
285
286         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
287
288         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
289                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
290
291         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
292
293         // Wait for akka's recovery to complete so it doesn't interfere.
294         mockRaftActor.waitForRecoveryComplete();
295
296         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
297         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
298         mockRaftActor.handleCommand(applySnapshot);
299
300         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
301         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
302         mockRaftActor.handleCommand(captureSnapshot);
303
304         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
305         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
306         mockRaftActor.handleCommand(captureSnapshotReply);
307
308         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
309         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
310         mockRaftActor.handleCommand(saveSnapshotSuccess);
311
312         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
313         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
314         mockRaftActor.handleCommand(saveSnapshotFailure);
315
316         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
317         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
318
319         verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
320         verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
321         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
322         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
323         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
324         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
325     }
326
327     @Test
328     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
329         new JavaTestKit(getSystem()) {
330             {
331                 String persistenceId = factory.generateActorId("leader-");
332
333                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
334
335                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
336
337                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
338
339                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
340                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
341
342                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
343
344                 mockRaftActor.waitForInitializeBehaviorComplete();
345
346                 mockRaftActor.waitUntilLeader();
347
348                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
349
350                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
351
352             }
353
354         };
355     }
356
357     @Test
358     public void testApplyState() throws Exception {
359
360         new JavaTestKit(getSystem()) {
361             {
362                 String persistenceId = factory.generateActorId("leader-");
363
364                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
365
366                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
367
368                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
369
370                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
371                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
372
373                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
374
375                 mockRaftActor.waitForInitializeBehaviorComplete();
376
377                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
378                         new MockRaftActorContext.MockPayload("F"));
379
380                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
381
382                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
383
384             }
385         };
386     }
387
388     @Test
389     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
390         new JavaTestKit(getSystem()) {{
391             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
392                     Props.create(MessageCollectorActor.class));
393             MessageCollectorActor.waitUntilReady(notifierActor);
394
395             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
396             long heartBeatInterval = 100;
397             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
398             config.setElectionTimeoutFactor(20);
399
400             String persistenceId = factory.generateActorId("notifier-");
401
402             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
403                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
404                     new NonPersistentDataProvider()), persistenceId);
405
406             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
407
408
409             // check if the notifier got a role change from null to Follower
410             RoleChanged raftRoleChanged = matches.get(0);
411             assertEquals(persistenceId, raftRoleChanged.getMemberId());
412             assertNull(raftRoleChanged.getOldRole());
413             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
414
415             // check if the notifier got a role change from Follower to Candidate
416             raftRoleChanged = matches.get(1);
417             assertEquals(persistenceId, raftRoleChanged.getMemberId());
418             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
419             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
420
421             // check if the notifier got a role change from Candidate to Leader
422             raftRoleChanged = matches.get(2);
423             assertEquals(persistenceId, raftRoleChanged.getMemberId());
424             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
425             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
426
427             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
428                     notifierActor, LeaderStateChanged.class);
429
430             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
431             assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
432
433             notifierActor.underlyingActor().clear();
434
435             MockRaftActor raftActor = raftActorRef.underlyingActor();
436             final String newLeaderId = "new-leader";
437             final short newLeaderVersion = 6;
438             Follower follower = new Follower(raftActor.getRaftActorContext()) {
439                 @Override
440                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
441                     leaderId = newLeaderId;
442                     setLeaderPayloadVersion(newLeaderVersion);
443                     return this;
444                 }
445             };
446
447             raftActor.changeCurrentBehavior(follower);
448
449             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
450             assertEquals(persistenceId, leaderStateChange.getMemberId());
451             assertEquals(null, leaderStateChange.getLeaderId());
452
453             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
454             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
455             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
456
457             notifierActor.underlyingActor().clear();
458
459             raftActor.handleCommand("any");
460
461             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
462             assertEquals(persistenceId, leaderStateChange.getMemberId());
463             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
464             assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
465
466             notifierActor.underlyingActor().clear();
467
468             raftActor.handleCommand("any");
469
470             Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
471             leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
472             assertNull(leaderStateChange);
473         }};
474     }
475
476     @Test
477     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
478         new JavaTestKit(getSystem()) {{
479             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
480             MessageCollectorActor.waitUntilReady(notifierActor);
481
482             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
483             long heartBeatInterval = 100;
484             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
485             config.setElectionTimeoutFactor(1);
486
487             String persistenceId = factory.generateActorId("notifier-");
488
489             factory.createActor(MockRaftActor.props(persistenceId,
490                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
491
492             List<RoleChanged> matches =  null;
493             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
494                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
495                 assertNotNull(matches);
496                 if(matches.size() == 3) {
497                     break;
498                 }
499                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
500             }
501
502             assertNotNull(matches);
503             assertEquals(2, matches.size());
504
505             // check if the notifier got a role change from null to Follower
506             RoleChanged raftRoleChanged = matches.get(0);
507             assertEquals(persistenceId, raftRoleChanged.getMemberId());
508             assertNull(raftRoleChanged.getOldRole());
509             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
510
511             // check if the notifier got a role change from Follower to Candidate
512             raftRoleChanged = matches.get(1);
513             assertEquals(persistenceId, raftRoleChanged.getMemberId());
514             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
515             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
516
517         }};
518     }
519
520     @Test
521     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
522         new JavaTestKit(getSystem()) {
523             {
524                 String persistenceId = factory.generateActorId("leader-");
525                 String follower1Id = factory.generateActorId("follower-");
526
527                 ActorRef followerActor1 =
528                         factory.createActor(Props.create(MessageCollectorActor.class));
529
530                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
531                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
532                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
533
534                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
535
536                 Map<String, String> peerAddresses = new HashMap<>();
537                 peerAddresses.put(follower1Id, followerActor1.path().toString());
538
539                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
540                         MockRaftActor.props(persistenceId, peerAddresses,
541                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
542
543                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
544
545                 leaderActor.getRaftActorContext().setCommitIndex(4);
546                 leaderActor.getRaftActorContext().setLastApplied(4);
547                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
548
549                 leaderActor.waitForInitializeBehaviorComplete();
550
551                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
552
553                 Leader leader = new Leader(leaderActor.getRaftActorContext());
554                 leaderActor.setCurrentBehavior(leader);
555                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
556
557                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
558                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
559
560                 assertEquals(8, leaderActor.getReplicatedLog().size());
561
562                 leaderActor.getRaftActorContext().getSnapshotManager()
563                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
564                                 new MockRaftActorContext.MockPayload("x")), 4);
565
566                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
567
568                 assertEquals(8, leaderActor.getReplicatedLog().size());
569
570                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
571                 //fake snapshot on index 5
572                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
573
574                 assertEquals(8, leaderActor.getReplicatedLog().size());
575
576                 //fake snapshot on index 6
577                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
578                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
579                 assertEquals(8, leaderActor.getReplicatedLog().size());
580
581                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
582
583                 assertEquals(8, leaderActor.getReplicatedLog().size());
584
585                 ByteString snapshotBytes = fromObject(Arrays.asList(
586                         new MockRaftActorContext.MockPayload("foo-0"),
587                         new MockRaftActorContext.MockPayload("foo-1"),
588                         new MockRaftActorContext.MockPayload("foo-2"),
589                         new MockRaftActorContext.MockPayload("foo-3"),
590                         new MockRaftActorContext.MockPayload("foo-4")));
591
592                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
593                         leader, Runtime.getRuntime().totalMemory());
594
595                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
596
597                 // The commit is needed to complete the snapshot creation process
598                 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, leader);
599
600                 // capture snapshot reply should remove the snapshotted entries only
601                 assertEquals(3, leaderActor.getReplicatedLog().size());
602                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
603
604                 // add another non-replicated entry
605                 leaderActor.getReplicatedLog().append(
606                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
607
608                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
609                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
610                 assertEquals(2, leaderActor.getReplicatedLog().size());
611                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
612
613             }
614         };
615     }
616
617     @Test
618     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
619         new JavaTestKit(getSystem()) {
620             {
621                 String persistenceId = factory.generateActorId("follower-");
622                 String leaderId = factory.generateActorId("leader-");
623
624
625                 ActorRef leaderActor1 =
626                         factory.createActor(Props.create(MessageCollectorActor.class));
627
628                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
629                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
630                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
631
632                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
633
634                 Map<String, String> peerAddresses = new HashMap<>();
635                 peerAddresses.put(leaderId, leaderActor1.path().toString());
636
637                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
638                         MockRaftActor.props(persistenceId, peerAddresses,
639                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
640
641                 MockRaftActor followerActor = mockActorRef.underlyingActor();
642                 followerActor.getRaftActorContext().setCommitIndex(4);
643                 followerActor.getRaftActorContext().setLastApplied(4);
644                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
645
646                 followerActor.waitForInitializeBehaviorComplete();
647
648
649                 Follower follower = new Follower(followerActor.getRaftActorContext());
650                 followerActor.setCurrentBehavior(follower);
651                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
652
653                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
654                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
655                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
656
657                 // log has indices 0-5
658                 assertEquals(6, followerActor.getReplicatedLog().size());
659
660                 //snapshot on 4
661                 followerActor.getRaftActorContext().getSnapshotManager().capture(
662                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
663                                 new MockRaftActorContext.MockPayload("D")), 4);
664
665                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
666
667                 assertEquals(6, followerActor.getReplicatedLog().size());
668
669                 //fake snapshot on index 6
670                 List<ReplicatedLogEntry> entries =
671                         Arrays.asList(
672                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
673                                         new MockRaftActorContext.MockPayload("foo-6"))
674                         );
675                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
676                 assertEquals(7, followerActor.getReplicatedLog().size());
677
678                 //fake snapshot on index 7
679                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
680
681                 entries =
682                         Arrays.asList(
683                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
684                                         new MockRaftActorContext.MockPayload("foo-7"))
685                         );
686                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
687                 assertEquals(8, followerActor.getReplicatedLog().size());
688
689                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
690
691
692                 ByteString snapshotBytes = fromObject(Arrays.asList(
693                         new MockRaftActorContext.MockPayload("foo-0"),
694                         new MockRaftActorContext.MockPayload("foo-1"),
695                         new MockRaftActorContext.MockPayload("foo-2"),
696                         new MockRaftActorContext.MockPayload("foo-3"),
697                         new MockRaftActorContext.MockPayload("foo-4")));
698                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
699                 assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
700
701                 // The commit is needed to complete the snapshot creation process
702                 followerActor.getRaftActorContext().getSnapshotManager().commit(-1, follower);
703
704                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
705                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
706                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
707
708                 entries =
709                         Arrays.asList(
710                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
711                                         new MockRaftActorContext.MockPayload("foo-7"))
712                         );
713                 // send an additional entry 8 with leaderCommit = 7
714                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
715
716                 // 7 and 8, as lastapplied is 7
717                 assertEquals(2, followerActor.getReplicatedLog().size());
718
719             }
720         };
721     }
722
723     @Test
724     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
725         new JavaTestKit(getSystem()) {
726             {
727                 String persistenceId = factory.generateActorId("leader-");
728                 String follower1Id = factory.generateActorId("follower-");
729                 String follower2Id = factory.generateActorId("follower-");
730
731                 ActorRef followerActor1 =
732                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
733                 ActorRef followerActor2 =
734                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
735
736                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
737                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
738                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
739
740                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
741
742                 Map<String, String> peerAddresses = new HashMap<>();
743                 peerAddresses.put(follower1Id, followerActor1.path().toString());
744                 peerAddresses.put(follower2Id, followerActor2.path().toString());
745
746                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
747                         MockRaftActor.props(persistenceId, peerAddresses,
748                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
749
750                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
751                 leaderActor.getRaftActorContext().setCommitIndex(9);
752                 leaderActor.getRaftActorContext().setLastApplied(9);
753                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
754
755                 leaderActor.waitForInitializeBehaviorComplete();
756
757                 Leader leader = new Leader(leaderActor.getRaftActorContext());
758                 leaderActor.setCurrentBehavior(leader);
759                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
760
761                 // create 5 entries in the log
762                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
763                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
764
765                 //set the snapshot index to 4 , 0 to 4 are snapshotted
766                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
767                 //setting replicatedToAllIndex = 9, for the log to clear
768                 leader.setReplicatedToAllIndex(9);
769                 assertEquals(5, leaderActor.getReplicatedLog().size());
770                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
771
772                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
773                 assertEquals(5, leaderActor.getReplicatedLog().size());
774                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
775
776                 // set the 2nd follower nextIndex to 1 which has been snapshotted
777                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
778                 assertEquals(5, leaderActor.getReplicatedLog().size());
779                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
780
781                 // simulate a real snapshot
782                 leaderActor.onReceiveCommand(new SendHeartBeat());
783                 assertEquals(5, leaderActor.getReplicatedLog().size());
784                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
785                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
786                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
787
788
789                 //reply from a slow follower does not initiate a fake snapshot
790                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
791                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
792
793                 ByteString snapshotBytes = fromObject(Arrays.asList(
794                         new MockRaftActorContext.MockPayload("foo-0"),
795                         new MockRaftActorContext.MockPayload("foo-1"),
796                         new MockRaftActorContext.MockPayload("foo-2"),
797                         new MockRaftActorContext.MockPayload("foo-3"),
798                         new MockRaftActorContext.MockPayload("foo-4")));
799                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
800                 assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
801
802                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
803
804                 //reply from a slow follower after should not raise errors
805                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
806                 assertEquals(0, leaderActor.getReplicatedLog().size());
807             }
808         };
809     }
810
811     @Test
812     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
813         new JavaTestKit(getSystem()) {{
814             String persistenceId = factory.generateActorId("leader-");
815             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
816             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
817             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
818             config.setSnapshotBatchCount(5);
819
820             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
821
822             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
823
824             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
825                     MockRaftActor.props(persistenceId, peerAddresses,
826                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
827
828             MockRaftActor leaderActor = mockActorRef.underlyingActor();
829             leaderActor.getRaftActorContext().setCommitIndex(3);
830             leaderActor.getRaftActorContext().setLastApplied(3);
831             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
832
833             leaderActor.waitForInitializeBehaviorComplete();
834             for(int i=0;i< 4;i++) {
835                 leaderActor.getReplicatedLog()
836                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
837                                 new MockRaftActorContext.MockPayload("A")));
838             }
839
840             Leader leader = new Leader(leaderActor.getRaftActorContext());
841             leaderActor.setCurrentBehavior(leader);
842             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
843
844             // Simulate an install snaphost to a follower.
845             leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
846                     leaderActor.getReplicatedLog().last(), -1, "member1");
847
848             // Now send a CaptureSnapshotReply
849             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
850
851             // Trimming log in this scenario is a no-op
852             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
853             assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
854             assertEquals(-1, leader.getReplicatedToAllIndex());
855
856         }};
857     }
858
859     @Test
860     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
861         new JavaTestKit(getSystem()) {{
862             String persistenceId = factory.generateActorId("leader-");
863             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
864             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
865             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
866             config.setSnapshotBatchCount(5);
867
868             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
869
870             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
871
872             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
873                     MockRaftActor.props(persistenceId, peerAddresses,
874                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
875
876             MockRaftActor leaderActor = mockActorRef.underlyingActor();
877             leaderActor.getRaftActorContext().setCommitIndex(3);
878             leaderActor.getRaftActorContext().setLastApplied(3);
879             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
880             leaderActor.getReplicatedLog().setSnapshotIndex(3);
881
882             leaderActor.waitForInitializeBehaviorComplete();
883             Leader leader = new Leader(leaderActor.getRaftActorContext());
884             leaderActor.setCurrentBehavior(leader);
885             leader.setReplicatedToAllIndex(3);
886             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
887
888             // Persist another entry (this will cause a CaptureSnapshot to be triggered
889             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
890
891             // Now send a CaptureSnapshotReply
892             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
893
894             // Trimming log in this scenario is a no-op
895             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
896             assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
897             assertEquals(3, leader.getReplicatedToAllIndex());
898
899         }};
900     }
901
902     @Test
903     public void testSwitchBehavior(){
904         String persistenceId = factory.generateActorId("leader-");
905         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
906         config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
907         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
908         config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
909         config.setSnapshotBatchCount(5);
910
911         DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
912
913         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
914
915         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
916                 MockRaftActor.props(persistenceId, peerAddresses,
917                         Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
918
919         MockRaftActor leaderActor = mockActorRef.underlyingActor();
920
921         leaderActor.handleRecover(RecoveryCompleted.getInstance());
922
923         leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
924
925         assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
926         assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
927
928         leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
929
930         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
931         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
932
933         leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
934
935         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
936         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
937
938         leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
939
940         assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
941         assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
942
943
944     }
945
946     public static ByteString fromObject(Object snapshot) throws Exception {
947         ByteArrayOutputStream b = null;
948         ObjectOutputStream o = null;
949         try {
950             b = new ByteArrayOutputStream();
951             o = new ObjectOutputStream(b);
952             o.writeObject(snapshot);
953             byte[] snapshotBytes = b.toByteArray();
954             return ByteString.copyFrom(snapshotBytes);
955         } finally {
956             if (o != null) {
957                 o.flush();
958                 o.close();
959             }
960             if (b != null) {
961                 b.close();
962             }
963         }
964     }
965
966 }