Add new DeleteEntries class wih long "fromIndex" field
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyObject;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.same;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.mock;
13 import static org.mockito.Mockito.times;
14 import static org.mockito.Mockito.verify;
15 import akka.actor.ActorRef;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Terminated;
19 import akka.japi.Procedure;
20 import akka.persistence.SaveSnapshotFailure;
21 import akka.persistence.SaveSnapshotSuccess;
22 import akka.persistence.SnapshotMetadata;
23 import akka.persistence.SnapshotOffer;
24 import akka.testkit.JavaTestKit;
25 import akka.testkit.TestActorRef;
26 import com.google.common.base.Optional;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import com.google.protobuf.ByteString;
30 import java.io.ByteArrayOutputStream;
31 import java.io.ObjectOutputStream;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
45 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
46 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
47 import org.opendaylight.controller.cluster.notifications.RoleChanged;
48 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
55 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
56 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
57 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
58 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
59 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
60 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
62 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
63 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
64 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import scala.concurrent.duration.FiniteDuration;
67
68 public class RaftActorTest extends AbstractActorTest {
69
70     private TestActorFactory factory;
71
72     @Before
73     public void setUp(){
74         factory = new TestActorFactory(getSystem());
75     }
76
77     @After
78     public void tearDown() throws Exception {
79         factory.close();
80         InMemoryJournal.clear();
81         InMemorySnapshotStore.clear();
82     }
83
84     @Test
85     public void testConstruction() {
86         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
87     }
88
89     @Test
90     public void testFindLeaderWhenLeaderIsSelf(){
91         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
92         kit.waitUntilLeader();
93     }
94
95     @Test
96     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
97         new JavaTestKit(getSystem()) {{
98             String persistenceId = factory.generateActorId("follower-");
99
100             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
101
102             // Set the heartbeat interval high to essentially disable election otherwise the test
103             // may fail if the actor is switched to Leader and the commitIndex is set to the last
104             // log entry.
105             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
106
107             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
108                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
109                     Optional.<ConfigParams>of(config)), persistenceId);
110
111             watch(followerActor);
112
113             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
114             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
115                     new MockRaftActorContext.MockPayload("E"));
116             snapshotUnappliedEntries.add(entry1);
117
118             int lastAppliedDuringSnapshotCapture = 3;
119             int lastIndexDuringSnapshotCapture = 4;
120
121             // 4 messages as part of snapshot, which are applied to state
122             ByteString snapshotBytes = fromObject(Arrays.asList(
123                     new MockRaftActorContext.MockPayload("A"),
124                     new MockRaftActorContext.MockPayload("B"),
125                     new MockRaftActorContext.MockPayload("C"),
126                     new MockRaftActorContext.MockPayload("D")));
127
128             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
129                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
130                     lastAppliedDuringSnapshotCapture, 1);
131             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
132
133             // add more entries after snapshot is taken
134             List<ReplicatedLogEntry> entries = new ArrayList<>();
135             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
136                     new MockRaftActorContext.MockPayload("F", 2));
137             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
138                     new MockRaftActorContext.MockPayload("G", 3));
139             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
140                     new MockRaftActorContext.MockPayload("H", 4));
141             entries.add(entry2);
142             entries.add(entry3);
143             entries.add(entry4);
144
145             int lastAppliedToState = 5;
146             int lastIndex = 7;
147
148             InMemoryJournal.addEntry(persistenceId, 5, entry2);
149             // 2 entries are applied to state besides the 4 entries in snapshot
150             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
151             InMemoryJournal.addEntry(persistenceId, 7, entry3);
152             InMemoryJournal.addEntry(persistenceId, 8, entry4);
153
154             // kill the actor
155             followerActor.tell(PoisonPill.getInstance(), null);
156             expectMsgClass(duration("5 seconds"), Terminated.class);
157
158             unwatch(followerActor);
159
160             //reinstate the actor
161             TestActorRef<MockRaftActor> ref = factory.createTestActor(
162                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
163                             Optional.<ConfigParams>of(config)));
164
165             MockRaftActor mockRaftActor = ref.underlyingActor();
166
167             mockRaftActor.waitForRecoveryComplete();
168
169             RaftActorContext context = mockRaftActor.getRaftActorContext();
170             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
171                     context.getReplicatedLog().size());
172             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
173             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
174             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
175             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
176             assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
177
178             mockRaftActor.waitForInitializeBehaviorComplete();
179
180             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
181         }};
182     }
183
184     @Test
185     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
186         new JavaTestKit(getSystem()) {{
187             String persistenceId = factory.generateActorId("follower-");
188
189             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
190
191             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
192
193             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
194                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
195                     Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
196
197             MockRaftActor mockRaftActor = ref.underlyingActor();
198
199             mockRaftActor.waitForRecoveryComplete();
200
201             mockRaftActor.waitForInitializeBehaviorComplete();
202
203             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
204         }};
205     }
206
207     @Test
208     public void testRaftActorForwardsToRaftActorRecoverySupport() {
209         String persistenceId = factory.generateActorId("leader-");
210
211         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
212
213         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
214
215         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
216                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
217
218         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
219
220         // Wait for akka's recovery to complete so it doesn't interfere.
221         mockRaftActor.waitForRecoveryComplete();
222
223         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
224         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
225
226         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
227         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
228         mockRaftActor.handleRecover(snapshotOffer);
229
230         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
231                 1, new MockRaftActorContext.MockPayload("1", 5));
232         mockRaftActor.handleRecover(logEntry);
233
234         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
235         mockRaftActor.handleRecover(applyJournalEntries);
236
237         ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
238         mockRaftActor.handleRecover(applyLogEntries);
239
240         DeleteEntries deleteEntries = new DeleteEntries(1);
241         mockRaftActor.handleRecover(deleteEntries);
242
243         org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
244                 new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
245         mockRaftActor.handleRecover(deprecatedDeleteEntries);
246
247         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
248         mockRaftActor.handleRecover(updateElectionTerm);
249
250         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
251         verify(mockSupport).handleRecoveryMessage(same(logEntry));
252         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
253         verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
254         verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
255         verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries));
256         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
257     }
258
259     @Test
260     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
261         String persistenceId = factory.generateActorId("leader-");
262
263         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
264
265         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
266
267         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
268
269         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
270                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
271
272         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
273
274         // Wait for akka's recovery to complete so it doesn't interfere.
275         mockRaftActor.waitForRecoveryComplete();
276
277         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
278         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
279         mockRaftActor.handleCommand(applySnapshot);
280
281         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1);
282         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
283         mockRaftActor.handleCommand(captureSnapshot);
284
285         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
286         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
287         mockRaftActor.handleCommand(captureSnapshotReply);
288
289         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
290         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
291         mockRaftActor.handleCommand(saveSnapshotSuccess);
292
293         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
294         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
295         mockRaftActor.handleCommand(saveSnapshotFailure);
296
297         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
298         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
299
300         verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
301         verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
302         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
303         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
304         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
305         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
306     }
307
308     @Test
309     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
310         new JavaTestKit(getSystem()) {
311             {
312                 String persistenceId = factory.generateActorId("leader-");
313
314                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
315
316                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
317
318                 CountDownLatch persistLatch = new CountDownLatch(1);
319                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
320                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
321
322                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
323                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
324
325                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
326
327                 mockRaftActor.waitForInitializeBehaviorComplete();
328
329                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
330
331                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
332             }
333         };
334     }
335
336     @Test
337     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
338         new JavaTestKit(getSystem()) {
339             {
340                 String persistenceId = factory.generateActorId("leader-");
341
342                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
343
344                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
345
346                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
347
348                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
349                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
350
351                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
352
353                 mockRaftActor.waitForInitializeBehaviorComplete();
354
355                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
356
357                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
358
359                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
360             }
361         };
362     }
363
364     @Test
365     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
366         new JavaTestKit(getSystem()) {
367             {
368                 String persistenceId = factory.generateActorId("leader-");
369
370                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
371
372                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
373
374                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
375
376                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
377                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
378
379                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
380
381                 mockRaftActor.waitForInitializeBehaviorComplete();
382
383                 mockRaftActor.waitUntilLeader();
384
385                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
386
387                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
388
389                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
390             }
391         };
392     }
393
394     @Test
395     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
396         new JavaTestKit(getSystem()) {
397             {
398                 String persistenceId = factory.generateActorId("leader-");
399
400                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
401
402                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
403
404                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
405
406                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
407                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
408
409                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
410
411                 mockRaftActor.waitForInitializeBehaviorComplete();
412
413                 mockRaftActor.waitUntilLeader();
414
415                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
416
417                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
418
419             }
420
421         };
422     }
423
424     @Test
425     public void testApplyState() throws Exception {
426
427         new JavaTestKit(getSystem()) {
428             {
429                 String persistenceId = factory.generateActorId("leader-");
430
431                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
432
433                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
434
435                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
436
437                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
438                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
439
440                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
441
442                 mockRaftActor.waitForInitializeBehaviorComplete();
443
444                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
445                         new MockRaftActorContext.MockPayload("F"));
446
447                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
448
449                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
450
451             }
452         };
453     }
454
455     @Test
456     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
457         new JavaTestKit(getSystem()) {{
458             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
459                     Props.create(MessageCollectorActor.class));
460             MessageCollectorActor.waitUntilReady(notifierActor);
461
462             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
463             long heartBeatInterval = 100;
464             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
465             config.setElectionTimeoutFactor(20);
466
467             String persistenceId = factory.generateActorId("notifier-");
468
469             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
470                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
471                     new NonPersistentDataProvider()), persistenceId);
472
473             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
474
475
476             // check if the notifier got a role change from null to Follower
477             RoleChanged raftRoleChanged = matches.get(0);
478             assertEquals(persistenceId, raftRoleChanged.getMemberId());
479             assertNull(raftRoleChanged.getOldRole());
480             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
481
482             // check if the notifier got a role change from Follower to Candidate
483             raftRoleChanged = matches.get(1);
484             assertEquals(persistenceId, raftRoleChanged.getMemberId());
485             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
486             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
487
488             // check if the notifier got a role change from Candidate to Leader
489             raftRoleChanged = matches.get(2);
490             assertEquals(persistenceId, raftRoleChanged.getMemberId());
491             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
492             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
493
494             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
495                     notifierActor, LeaderStateChanged.class);
496
497             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
498
499             notifierActor.underlyingActor().clear();
500
501             MockRaftActor raftActor = raftActorRef.underlyingActor();
502             final String newLeaderId = "new-leader";
503             Follower follower = new Follower(raftActor.getRaftActorContext()) {
504                 @Override
505                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
506                     leaderId = newLeaderId;
507                     return this;
508                 }
509             };
510
511             raftActor.changeCurrentBehavior(follower);
512
513             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
514             assertEquals(persistenceId, leaderStateChange.getMemberId());
515             assertEquals(null, leaderStateChange.getLeaderId());
516
517             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
518             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
519             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
520
521             notifierActor.underlyingActor().clear();
522
523             raftActor.handleCommand("any");
524
525             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
526             assertEquals(persistenceId, leaderStateChange.getMemberId());
527             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
528         }};
529     }
530
531     @Test
532     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
533         new JavaTestKit(getSystem()) {{
534             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
535             MessageCollectorActor.waitUntilReady(notifierActor);
536
537             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
538             long heartBeatInterval = 100;
539             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
540             config.setElectionTimeoutFactor(1);
541
542             String persistenceId = factory.generateActorId("notifier-");
543
544             factory.createActor(MockRaftActor.props(persistenceId,
545                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
546
547             List<RoleChanged> matches =  null;
548             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
549                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
550                 assertNotNull(matches);
551                 if(matches.size() == 3) {
552                     break;
553                 }
554                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
555             }
556
557             assertEquals(2, matches.size());
558
559             // check if the notifier got a role change from null to Follower
560             RoleChanged raftRoleChanged = matches.get(0);
561             assertEquals(persistenceId, raftRoleChanged.getMemberId());
562             assertNull(raftRoleChanged.getOldRole());
563             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
564
565             // check if the notifier got a role change from Follower to Candidate
566             raftRoleChanged = matches.get(1);
567             assertEquals(persistenceId, raftRoleChanged.getMemberId());
568             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
569             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
570
571         }};
572     }
573
574     @Test
575     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
576         new JavaTestKit(getSystem()) {
577             {
578                 String persistenceId = factory.generateActorId("leader-");
579                 String follower1Id = factory.generateActorId("follower-");
580
581                 ActorRef followerActor1 =
582                         factory.createActor(Props.create(MessageCollectorActor.class));
583
584                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
585                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
586                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
587
588                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
589
590                 Map<String, String> peerAddresses = new HashMap<>();
591                 peerAddresses.put(follower1Id, followerActor1.path().toString());
592
593                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
594                         MockRaftActor.props(persistenceId, peerAddresses,
595                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
596
597                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
598
599                 leaderActor.getRaftActorContext().setCommitIndex(4);
600                 leaderActor.getRaftActorContext().setLastApplied(4);
601                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
602
603                 leaderActor.waitForInitializeBehaviorComplete();
604
605                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
606
607                 Leader leader = new Leader(leaderActor.getRaftActorContext());
608                 leaderActor.setCurrentBehavior(leader);
609                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
610
611                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
612                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
613
614                 assertEquals(8, leaderActor.getReplicatedLog().size());
615
616                 leaderActor.getRaftActorContext().getSnapshotManager()
617                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
618                                 new MockRaftActorContext.MockPayload("x")), 4);
619
620                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
621
622                 assertEquals(8, leaderActor.getReplicatedLog().size());
623
624                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
625                 //fake snapshot on index 5
626                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
627
628                 assertEquals(8, leaderActor.getReplicatedLog().size());
629
630                 //fake snapshot on index 6
631                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
632                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
633                 assertEquals(8, leaderActor.getReplicatedLog().size());
634
635                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
636
637                 assertEquals(8, leaderActor.getReplicatedLog().size());
638
639                 ByteString snapshotBytes = fromObject(Arrays.asList(
640                         new MockRaftActorContext.MockPayload("foo-0"),
641                         new MockRaftActorContext.MockPayload("foo-1"),
642                         new MockRaftActorContext.MockPayload("foo-2"),
643                         new MockRaftActorContext.MockPayload("foo-3"),
644                         new MockRaftActorContext.MockPayload("foo-4")));
645
646                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
647                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
648
649                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
650
651                 // The commit is needed to complete the snapshot creation process
652                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
653
654                 // capture snapshot reply should remove the snapshotted entries only
655                 assertEquals(3, leaderActor.getReplicatedLog().size());
656                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
657
658                 // add another non-replicated entry
659                 leaderActor.getReplicatedLog().append(
660                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
661
662                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
663                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
664                 assertEquals(2, leaderActor.getReplicatedLog().size());
665                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
666
667             }
668         };
669     }
670
671     @Test
672     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
673         new JavaTestKit(getSystem()) {
674             {
675                 String persistenceId = factory.generateActorId("follower-");
676                 String leaderId = factory.generateActorId("leader-");
677
678
679                 ActorRef leaderActor1 =
680                         factory.createActor(Props.create(MessageCollectorActor.class));
681
682                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
683                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
684                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
685
686                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
687
688                 Map<String, String> peerAddresses = new HashMap<>();
689                 peerAddresses.put(leaderId, leaderActor1.path().toString());
690
691                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
692                         MockRaftActor.props(persistenceId, peerAddresses,
693                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
694
695                 MockRaftActor followerActor = mockActorRef.underlyingActor();
696                 followerActor.getRaftActorContext().setCommitIndex(4);
697                 followerActor.getRaftActorContext().setLastApplied(4);
698                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
699
700                 followerActor.waitForInitializeBehaviorComplete();
701
702
703                 Follower follower = new Follower(followerActor.getRaftActorContext());
704                 followerActor.setCurrentBehavior(follower);
705                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
706
707                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
708                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
709                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
710
711                 // log has indices 0-5
712                 assertEquals(6, followerActor.getReplicatedLog().size());
713
714                 //snapshot on 4
715                 followerActor.getRaftActorContext().getSnapshotManager().capture(
716                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
717                                 new MockRaftActorContext.MockPayload("D")), 4);
718
719                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
720
721                 assertEquals(6, followerActor.getReplicatedLog().size());
722
723                 //fake snapshot on index 6
724                 List<ReplicatedLogEntry> entries =
725                         Arrays.asList(
726                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
727                                         new MockRaftActorContext.MockPayload("foo-6"))
728                         );
729                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
730                 assertEquals(7, followerActor.getReplicatedLog().size());
731
732                 //fake snapshot on index 7
733                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
734
735                 entries =
736                         Arrays.asList(
737                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
738                                         new MockRaftActorContext.MockPayload("foo-7"))
739                         );
740                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
741                 assertEquals(8, followerActor.getReplicatedLog().size());
742
743                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
744
745
746                 ByteString snapshotBytes = fromObject(Arrays.asList(
747                         new MockRaftActorContext.MockPayload("foo-0"),
748                         new MockRaftActorContext.MockPayload("foo-1"),
749                         new MockRaftActorContext.MockPayload("foo-2"),
750                         new MockRaftActorContext.MockPayload("foo-3"),
751                         new MockRaftActorContext.MockPayload("foo-4")));
752                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
753                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
754
755                 // The commit is needed to complete the snapshot creation process
756                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
757
758                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
759                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
760                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
761
762                 entries =
763                         Arrays.asList(
764                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
765                                         new MockRaftActorContext.MockPayload("foo-7"))
766                         );
767                 // send an additional entry 8 with leaderCommit = 7
768                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
769
770                 // 7 and 8, as lastapplied is 7
771                 assertEquals(2, followerActor.getReplicatedLog().size());
772
773             }
774         };
775     }
776
777     @Test
778     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
779         new JavaTestKit(getSystem()) {
780             {
781                 String persistenceId = factory.generateActorId("leader-");
782                 String follower1Id = factory.generateActorId("follower-");
783                 String follower2Id = factory.generateActorId("follower-");
784
785                 ActorRef followerActor1 =
786                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
787                 ActorRef followerActor2 =
788                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
789
790                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
791                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
792                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
793
794                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
795
796                 Map<String, String> peerAddresses = new HashMap<>();
797                 peerAddresses.put(follower1Id, followerActor1.path().toString());
798                 peerAddresses.put(follower2Id, followerActor2.path().toString());
799
800                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
801                         MockRaftActor.props(persistenceId, peerAddresses,
802                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
803
804                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
805                 leaderActor.getRaftActorContext().setCommitIndex(9);
806                 leaderActor.getRaftActorContext().setLastApplied(9);
807                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
808
809                 leaderActor.waitForInitializeBehaviorComplete();
810
811                 Leader leader = new Leader(leaderActor.getRaftActorContext());
812                 leaderActor.setCurrentBehavior(leader);
813                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
814
815                 // create 5 entries in the log
816                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
817                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
818
819                 //set the snapshot index to 4 , 0 to 4 are snapshotted
820                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
821                 //setting replicatedToAllIndex = 9, for the log to clear
822                 leader.setReplicatedToAllIndex(9);
823                 assertEquals(5, leaderActor.getReplicatedLog().size());
824                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
825
826                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
827                 assertEquals(5, leaderActor.getReplicatedLog().size());
828                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
829
830                 // set the 2nd follower nextIndex to 1 which has been snapshotted
831                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
832                 assertEquals(5, leaderActor.getReplicatedLog().size());
833                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
834
835                 // simulate a real snapshot
836                 leaderActor.onReceiveCommand(new SendHeartBeat());
837                 assertEquals(5, leaderActor.getReplicatedLog().size());
838                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
839                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
840                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
841
842
843                 //reply from a slow follower does not initiate a fake snapshot
844                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
845                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
846
847                 ByteString snapshotBytes = fromObject(Arrays.asList(
848                         new MockRaftActorContext.MockPayload("foo-0"),
849                         new MockRaftActorContext.MockPayload("foo-1"),
850                         new MockRaftActorContext.MockPayload("foo-2"),
851                         new MockRaftActorContext.MockPayload("foo-3"),
852                         new MockRaftActorContext.MockPayload("foo-4")));
853                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
854                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
855
856                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
857
858                 //reply from a slow follower after should not raise errors
859                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
860                 assertEquals(0, leaderActor.getReplicatedLog().size());
861             }
862         };
863     }
864
865     @Test
866     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
867         new JavaTestKit(getSystem()) {{
868             String persistenceId = factory.generateActorId("leader-");
869             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
870             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
871             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
872             config.setSnapshotBatchCount(5);
873
874             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
875
876             Map<String, String> peerAddresses = new HashMap<>();
877
878             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
879                     MockRaftActor.props(persistenceId, peerAddresses,
880                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
881
882             MockRaftActor leaderActor = mockActorRef.underlyingActor();
883             leaderActor.getRaftActorContext().setCommitIndex(3);
884             leaderActor.getRaftActorContext().setLastApplied(3);
885             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
886
887             leaderActor.waitForInitializeBehaviorComplete();
888             for(int i=0;i< 4;i++) {
889                 leaderActor.getReplicatedLog()
890                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
891                                 new MockRaftActorContext.MockPayload("A")));
892             }
893
894             Leader leader = new Leader(leaderActor.getRaftActorContext());
895             leaderActor.setCurrentBehavior(leader);
896             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
897
898             // Persist another entry (this will cause a CaptureSnapshot to be triggered
899             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
900
901             // Now send a CaptureSnapshotReply
902             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
903
904             // Trimming log in this scenario is a no-op
905             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
906             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
907             assertEquals(-1, leader.getReplicatedToAllIndex());
908
909         }};
910     }
911
912     @Test
913     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
914         new JavaTestKit(getSystem()) {{
915             String persistenceId = factory.generateActorId("leader-");
916             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
917             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
918             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
919             config.setSnapshotBatchCount(5);
920
921             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
922
923             Map<String, String> peerAddresses = new HashMap<>();
924
925             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
926                     MockRaftActor.props(persistenceId, peerAddresses,
927                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
928
929             MockRaftActor leaderActor = mockActorRef.underlyingActor();
930             leaderActor.getRaftActorContext().setCommitIndex(3);
931             leaderActor.getRaftActorContext().setLastApplied(3);
932             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
933             leaderActor.getReplicatedLog().setSnapshotIndex(3);
934
935             leaderActor.waitForInitializeBehaviorComplete();
936             Leader leader = new Leader(leaderActor.getRaftActorContext());
937             leaderActor.setCurrentBehavior(leader);
938             leader.setReplicatedToAllIndex(3);
939             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
940
941             // Persist another entry (this will cause a CaptureSnapshot to be triggered
942             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
943
944             // Now send a CaptureSnapshotReply
945             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
946
947             // Trimming log in this scenario is a no-op
948             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
949             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
950             assertEquals(3, leader.getReplicatedToAllIndex());
951
952         }};
953     }
954
955     private ByteString fromObject(Object snapshot) throws Exception {
956         ByteArrayOutputStream b = null;
957         ObjectOutputStream o = null;
958         try {
959             b = new ByteArrayOutputStream();
960             o = new ObjectOutputStream(b);
961             o.writeObject(snapshot);
962             byte[] snapshotBytes = b.toByteArray();
963             return ByteString.copyFrom(snapshotBytes);
964         } finally {
965             if (o != null) {
966                 o.flush();
967                 o.close();
968             }
969             if (b != null) {
970                 b.close();
971             }
972         }
973     }
974
975 }