Add unit tests for RaftActorSnapshotMessageSupport
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
1 package org.opendaylight.controller.cluster.raft;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.anyObject;
9 import static org.mockito.Matchers.eq;
10 import static org.mockito.Matchers.same;
11 import static org.mockito.Mockito.doReturn;
12 import static org.mockito.Mockito.mock;
13 import static org.mockito.Mockito.times;
14 import static org.mockito.Mockito.verify;
15 import akka.actor.ActorRef;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Terminated;
19 import akka.japi.Procedure;
20 import akka.persistence.SaveSnapshotFailure;
21 import akka.persistence.SaveSnapshotSuccess;
22 import akka.persistence.SnapshotMetadata;
23 import akka.persistence.SnapshotOffer;
24 import akka.testkit.JavaTestKit;
25 import akka.testkit.TestActorRef;
26 import com.google.common.base.Optional;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import com.google.protobuf.ByteString;
30 import java.io.ByteArrayOutputStream;
31 import java.io.ObjectOutputStream;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collections;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
45 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
46 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
47 import org.opendaylight.controller.cluster.notifications.RoleChanged;
48 import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
49 import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
54 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
56 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
57 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
58 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
59 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
60 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
62 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
63 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
64 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import scala.concurrent.duration.FiniteDuration;
67
68 public class RaftActorTest extends AbstractActorTest {
69
70     private TestActorFactory factory;
71
72     @Before
73     public void setUp(){
74         factory = new TestActorFactory(getSystem());
75     }
76
77     @After
78     public void tearDown() throws Exception {
79         factory.close();
80         InMemoryJournal.clear();
81         InMemorySnapshotStore.clear();
82     }
83
84     @Test
85     public void testConstruction() {
86         new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
87     }
88
89     @Test
90     public void testFindLeaderWhenLeaderIsSelf(){
91         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
92         kit.waitUntilLeader();
93     }
94
95     @Test
96     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
97         new JavaTestKit(getSystem()) {{
98             String persistenceId = factory.generateActorId("follower-");
99
100             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
101
102             // Set the heartbeat interval high to essentially disable election otherwise the test
103             // may fail if the actor is switched to Leader and the commitIndex is set to the last
104             // log entry.
105             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
106
107             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
108                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
109                     Optional.<ConfigParams>of(config)), persistenceId);
110
111             watch(followerActor);
112
113             List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
114             ReplicatedLogEntry entry1 = new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
115                     new MockRaftActorContext.MockPayload("E"));
116             snapshotUnappliedEntries.add(entry1);
117
118             int lastAppliedDuringSnapshotCapture = 3;
119             int lastIndexDuringSnapshotCapture = 4;
120
121             // 4 messages as part of snapshot, which are applied to state
122             ByteString snapshotBytes = fromObject(Arrays.asList(
123                     new MockRaftActorContext.MockPayload("A"),
124                     new MockRaftActorContext.MockPayload("B"),
125                     new MockRaftActorContext.MockPayload("C"),
126                     new MockRaftActorContext.MockPayload("D")));
127
128             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
129                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
130                     lastAppliedDuringSnapshotCapture, 1);
131             InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
132
133             // add more entries after snapshot is taken
134             List<ReplicatedLogEntry> entries = new ArrayList<>();
135             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
136                     new MockRaftActorContext.MockPayload("F", 2));
137             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
138                     new MockRaftActorContext.MockPayload("G", 3));
139             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
140                     new MockRaftActorContext.MockPayload("H", 4));
141             entries.add(entry2);
142             entries.add(entry3);
143             entries.add(entry4);
144
145             int lastAppliedToState = 5;
146             int lastIndex = 7;
147
148             InMemoryJournal.addEntry(persistenceId, 5, entry2);
149             // 2 entries are applied to state besides the 4 entries in snapshot
150             InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
151             InMemoryJournal.addEntry(persistenceId, 7, entry3);
152             InMemoryJournal.addEntry(persistenceId, 8, entry4);
153
154             // kill the actor
155             followerActor.tell(PoisonPill.getInstance(), null);
156             expectMsgClass(duration("5 seconds"), Terminated.class);
157
158             unwatch(followerActor);
159
160             //reinstate the actor
161             TestActorRef<MockRaftActor> ref = factory.createTestActor(
162                     MockRaftActor.props(persistenceId, Collections.<String, String>emptyMap(),
163                             Optional.<ConfigParams>of(config)));
164
165             MockRaftActor mockRaftActor = ref.underlyingActor();
166
167             mockRaftActor.waitForRecoveryComplete();
168
169             RaftActorContext context = mockRaftActor.getRaftActorContext();
170             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
171                     context.getReplicatedLog().size());
172             assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
173             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
174             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
175             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
176             assertEquals("Recovered state size", 6, mockRaftActor.getState().size());
177
178             mockRaftActor.waitForInitializeBehaviorComplete();
179
180             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
181         }};
182     }
183
184     @Test
185     public void testRaftActorRecoveryWithPersistenceDisabled() throws Exception {
186         new JavaTestKit(getSystem()) {{
187             String persistenceId = factory.generateActorId("follower-");
188
189             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
190
191             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
192
193             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
194                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
195                     Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
196
197             MockRaftActor mockRaftActor = ref.underlyingActor();
198
199             mockRaftActor.waitForRecoveryComplete();
200
201             mockRaftActor.waitForInitializeBehaviorComplete();
202
203             assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
204         }};
205     }
206
207     @Test
208     public void testRaftActorForwardsToRaftActorRecoverySupport() {
209         String persistenceId = factory.generateActorId("leader-");
210
211         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
212
213         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
214
215         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
216                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
217
218         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
219
220         // Wait for akka's recovery to complete so it doesn't interfere.
221         mockRaftActor.waitForRecoveryComplete();
222
223         RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
224         mockRaftActor.setRaftActorRecoverySupport(mockSupport );
225
226         Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
227         SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
228         mockRaftActor.handleRecover(snapshotOffer);
229
230         MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
231                 1, new MockRaftActorContext.MockPayload("1", 5));
232         mockRaftActor.handleRecover(logEntry);
233
234         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
235         mockRaftActor.handleRecover(applyJournalEntries);
236
237         ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
238         mockRaftActor.handleRecover(applyLogEntries);
239
240         DeleteEntries deleteEntries = new DeleteEntries(1);
241         mockRaftActor.handleRecover(deleteEntries);
242
243         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
244         mockRaftActor.handleRecover(updateElectionTerm);
245
246         verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
247         verify(mockSupport).handleRecoveryMessage(same(logEntry));
248         verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
249         verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
250         verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
251         verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
252     }
253
254     @Test
255     public void testRaftActorForwardsToRaftActorSnapshotMessageSupport() {
256         String persistenceId = factory.generateActorId("leader-");
257
258         DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
259
260         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
261
262         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
263
264         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
265                 Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
266
267         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
268
269         // Wait for akka's recovery to complete so it doesn't interfere.
270         mockRaftActor.waitForRecoveryComplete();
271
272         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
273         doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
274         mockRaftActor.handleCommand(applySnapshot);
275
276         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1);
277         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
278         mockRaftActor.handleCommand(captureSnapshot);
279
280         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
281         doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
282         mockRaftActor.handleCommand(captureSnapshotReply);
283
284         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
285         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
286         mockRaftActor.handleCommand(saveSnapshotSuccess);
287
288         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
289         doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
290         mockRaftActor.handleCommand(saveSnapshotFailure);
291
292         doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
293         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
294
295         verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
296         verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
297         verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
298         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
299         verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
300         verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
301     }
302
303     @Test
304     public void testUpdatingElectionTermCallsDataPersistence() throws Exception {
305         new JavaTestKit(getSystem()) {
306             {
307                 String persistenceId = factory.generateActorId("leader-");
308
309                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
310
311                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
312
313                 CountDownLatch persistLatch = new CountDownLatch(1);
314                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
315                 dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
316
317                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
318                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
319
320                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
321
322                 mockRaftActor.waitForInitializeBehaviorComplete();
323
324                 mockRaftActor.getRaftActorContext().getTermInformation().updateAndPersist(10, "foobar");
325
326                 assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
327             }
328         };
329     }
330
331     @Test
332     public void testAddingReplicatedLogEntryCallsDataPersistence() throws Exception {
333         new JavaTestKit(getSystem()) {
334             {
335                 String persistenceId = factory.generateActorId("leader-");
336
337                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
338
339                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
340
341                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
342
343                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
344                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
345
346                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
347
348                 mockRaftActor.waitForInitializeBehaviorComplete();
349
350                 MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
351
352                 mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
353
354                 verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
355             }
356         };
357     }
358
359     @Test
360     public void testRemovingReplicatedLogEntryCallsDataPersistence() throws Exception {
361         new JavaTestKit(getSystem()) {
362             {
363                 String persistenceId = factory.generateActorId("leader-");
364
365                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
366
367                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
368
369                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
370
371                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
372                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
373
374                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
375
376                 mockRaftActor.waitForInitializeBehaviorComplete();
377
378                 mockRaftActor.waitUntilLeader();
379
380                 mockRaftActor.getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, mock(Payload.class)));
381
382                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
383
384                 verify(dataPersistenceProvider, times(3)).persist(anyObject(), any(Procedure.class));
385             }
386         };
387     }
388
389     @Test
390     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
391         new JavaTestKit(getSystem()) {
392             {
393                 String persistenceId = factory.generateActorId("leader-");
394
395                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
396
397                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
398
399                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
400
401                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
402                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
403
404                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
405
406                 mockRaftActor.waitForInitializeBehaviorComplete();
407
408                 mockRaftActor.waitUntilLeader();
409
410                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
411
412                 verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
413
414             }
415
416         };
417     }
418
419     @Test
420     public void testApplyState() throws Exception {
421
422         new JavaTestKit(getSystem()) {
423             {
424                 String persistenceId = factory.generateActorId("leader-");
425
426                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
427
428                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
429
430                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
431
432                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
433                         Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
434
435                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
436
437                 mockRaftActor.waitForInitializeBehaviorComplete();
438
439                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
440                         new MockRaftActorContext.MockPayload("F"));
441
442                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
443
444                 verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
445
446             }
447         };
448     }
449
450     @Test
451     public void testRaftRoleChangeNotifierWhenRaftActorHasNoPeers() throws Exception {
452         new JavaTestKit(getSystem()) {{
453             TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
454                     Props.create(MessageCollectorActor.class));
455             MessageCollectorActor.waitUntilReady(notifierActor);
456
457             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
458             long heartBeatInterval = 100;
459             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
460             config.setElectionTimeoutFactor(20);
461
462             String persistenceId = factory.generateActorId("notifier-");
463
464             TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
465                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
466                     new NonPersistentDataProvider()), persistenceId);
467
468             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
469
470
471             // check if the notifier got a role change from null to Follower
472             RoleChanged raftRoleChanged = matches.get(0);
473             assertEquals(persistenceId, raftRoleChanged.getMemberId());
474             assertNull(raftRoleChanged.getOldRole());
475             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
476
477             // check if the notifier got a role change from Follower to Candidate
478             raftRoleChanged = matches.get(1);
479             assertEquals(persistenceId, raftRoleChanged.getMemberId());
480             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
481             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
482
483             // check if the notifier got a role change from Candidate to Leader
484             raftRoleChanged = matches.get(2);
485             assertEquals(persistenceId, raftRoleChanged.getMemberId());
486             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
487             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
488
489             LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
490                     notifierActor, LeaderStateChanged.class);
491
492             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
493
494             notifierActor.underlyingActor().clear();
495
496             MockRaftActor raftActor = raftActorRef.underlyingActor();
497             final String newLeaderId = "new-leader";
498             Follower follower = new Follower(raftActor.getRaftActorContext()) {
499                 @Override
500                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
501                     leaderId = newLeaderId;
502                     return this;
503                 }
504             };
505
506             raftActor.changeCurrentBehavior(follower);
507
508             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
509             assertEquals(persistenceId, leaderStateChange.getMemberId());
510             assertEquals(null, leaderStateChange.getLeaderId());
511
512             raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
513             assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
514             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
515
516             notifierActor.underlyingActor().clear();
517
518             raftActor.handleCommand("any");
519
520             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
521             assertEquals(persistenceId, leaderStateChange.getMemberId());
522             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
523         }};
524     }
525
526     @Test
527     public void testRaftRoleChangeNotifierWhenRaftActorHasPeers() throws Exception {
528         new JavaTestKit(getSystem()) {{
529             ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
530             MessageCollectorActor.waitUntilReady(notifierActor);
531
532             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
533             long heartBeatInterval = 100;
534             config.setHeartBeatInterval(FiniteDuration.create(heartBeatInterval, TimeUnit.MILLISECONDS));
535             config.setElectionTimeoutFactor(1);
536
537             String persistenceId = factory.generateActorId("notifier-");
538
539             factory.createActor(MockRaftActor.props(persistenceId,
540                     ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
541
542             List<RoleChanged> matches =  null;
543             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
544                 matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
545                 assertNotNull(matches);
546                 if(matches.size() == 3) {
547                     break;
548                 }
549                 Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
550             }
551
552             assertEquals(2, matches.size());
553
554             // check if the notifier got a role change from null to Follower
555             RoleChanged raftRoleChanged = matches.get(0);
556             assertEquals(persistenceId, raftRoleChanged.getMemberId());
557             assertNull(raftRoleChanged.getOldRole());
558             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
559
560             // check if the notifier got a role change from Follower to Candidate
561             raftRoleChanged = matches.get(1);
562             assertEquals(persistenceId, raftRoleChanged.getMemberId());
563             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
564             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
565
566         }};
567     }
568
569     @Test
570     public void testFakeSnapshotsForLeaderWithInRealSnapshots() throws Exception {
571         new JavaTestKit(getSystem()) {
572             {
573                 String persistenceId = factory.generateActorId("leader-");
574                 String follower1Id = factory.generateActorId("follower-");
575
576                 ActorRef followerActor1 =
577                         factory.createActor(Props.create(MessageCollectorActor.class));
578
579                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
580                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
581                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
582
583                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
584
585                 Map<String, String> peerAddresses = new HashMap<>();
586                 peerAddresses.put(follower1Id, followerActor1.path().toString());
587
588                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
589                         MockRaftActor.props(persistenceId, peerAddresses,
590                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
591
592                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
593
594                 leaderActor.getRaftActorContext().setCommitIndex(4);
595                 leaderActor.getRaftActorContext().setLastApplied(4);
596                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
597
598                 leaderActor.waitForInitializeBehaviorComplete();
599
600                 // create 8 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
601
602                 Leader leader = new Leader(leaderActor.getRaftActorContext());
603                 leaderActor.setCurrentBehavior(leader);
604                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
605
606                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
607                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 8, 1).build());
608
609                 assertEquals(8, leaderActor.getReplicatedLog().size());
610
611                 leaderActor.getRaftActorContext().getSnapshotManager()
612                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
613                                 new MockRaftActorContext.MockPayload("x")), 4);
614
615                 verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
616
617                 assertEquals(8, leaderActor.getReplicatedLog().size());
618
619                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
620                 //fake snapshot on index 5
621                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
622
623                 assertEquals(8, leaderActor.getReplicatedLog().size());
624
625                 //fake snapshot on index 6
626                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
627                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
628                 assertEquals(8, leaderActor.getReplicatedLog().size());
629
630                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
631
632                 assertEquals(8, leaderActor.getReplicatedLog().size());
633
634                 ByteString snapshotBytes = fromObject(Arrays.asList(
635                         new MockRaftActorContext.MockPayload("foo-0"),
636                         new MockRaftActorContext.MockPayload("foo-1"),
637                         new MockRaftActorContext.MockPayload("foo-2"),
638                         new MockRaftActorContext.MockPayload("foo-3"),
639                         new MockRaftActorContext.MockPayload("foo-4")));
640
641                 leaderActor.getRaftActorContext().getSnapshotManager().persist(new NonPersistentDataProvider()
642                         , snapshotBytes.toByteArray(), leader, Runtime.getRuntime().totalMemory());
643
644                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
645
646                 // The commit is needed to complete the snapshot creation process
647                 leaderActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
648
649                 // capture snapshot reply should remove the snapshotted entries only
650                 assertEquals(3, leaderActor.getReplicatedLog().size());
651                 assertEquals(7, leaderActor.getReplicatedLog().lastIndex());
652
653                 // add another non-replicated entry
654                 leaderActor.getReplicatedLog().append(
655                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
656
657                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
658                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
659                 assertEquals(2, leaderActor.getReplicatedLog().size());
660                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
661
662             }
663         };
664     }
665
666     @Test
667     public void testFakeSnapshotsForFollowerWithInRealSnapshots() throws Exception {
668         new JavaTestKit(getSystem()) {
669             {
670                 String persistenceId = factory.generateActorId("follower-");
671                 String leaderId = factory.generateActorId("leader-");
672
673
674                 ActorRef leaderActor1 =
675                         factory.createActor(Props.create(MessageCollectorActor.class));
676
677                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
678                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
679                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
680
681                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
682
683                 Map<String, String> peerAddresses = new HashMap<>();
684                 peerAddresses.put(leaderId, leaderActor1.path().toString());
685
686                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
687                         MockRaftActor.props(persistenceId, peerAddresses,
688                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
689
690                 MockRaftActor followerActor = mockActorRef.underlyingActor();
691                 followerActor.getRaftActorContext().setCommitIndex(4);
692                 followerActor.getRaftActorContext().setLastApplied(4);
693                 followerActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
694
695                 followerActor.waitForInitializeBehaviorComplete();
696
697
698                 Follower follower = new Follower(followerActor.getRaftActorContext());
699                 followerActor.setCurrentBehavior(follower);
700                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
701
702                 // create 6 entries in the log - 0 to 4 are applied and will get picked up as part of the capture snapshot
703                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
704                 followerActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(0, 6, 1).build());
705
706                 // log has indices 0-5
707                 assertEquals(6, followerActor.getReplicatedLog().size());
708
709                 //snapshot on 4
710                 followerActor.getRaftActorContext().getSnapshotManager().capture(
711                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
712                                 new MockRaftActorContext.MockPayload("D")), 4);
713
714                 verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
715
716                 assertEquals(6, followerActor.getReplicatedLog().size());
717
718                 //fake snapshot on index 6
719                 List<ReplicatedLogEntry> entries =
720                         Arrays.asList(
721                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
722                                         new MockRaftActorContext.MockPayload("foo-6"))
723                         );
724                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
725                 assertEquals(7, followerActor.getReplicatedLog().size());
726
727                 //fake snapshot on index 7
728                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
729
730                 entries =
731                         Arrays.asList(
732                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
733                                         new MockRaftActorContext.MockPayload("foo-7"))
734                         );
735                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
736                 assertEquals(8, followerActor.getReplicatedLog().size());
737
738                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
739
740
741                 ByteString snapshotBytes = fromObject(Arrays.asList(
742                         new MockRaftActorContext.MockPayload("foo-0"),
743                         new MockRaftActorContext.MockPayload("foo-1"),
744                         new MockRaftActorContext.MockPayload("foo-2"),
745                         new MockRaftActorContext.MockPayload("foo-3"),
746                         new MockRaftActorContext.MockPayload("foo-4")));
747                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
748                 assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
749
750                 // The commit is needed to complete the snapshot creation process
751                 followerActor.getRaftActorContext().getSnapshotManager().commit(new NonPersistentDataProvider(), -1);
752
753                 // capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
754                 assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
755                 assertEquals(7, followerActor.getReplicatedLog().lastIndex());
756
757                 entries =
758                         Arrays.asList(
759                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 8,
760                                         new MockRaftActorContext.MockPayload("foo-7"))
761                         );
762                 // send an additional entry 8 with leaderCommit = 7
763                 followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
764
765                 // 7 and 8, as lastapplied is 7
766                 assertEquals(2, followerActor.getReplicatedLog().size());
767
768             }
769         };
770     }
771
772     @Test
773     public void testFakeSnapshotsForLeaderWithInInitiateSnapshots() throws Exception {
774         new JavaTestKit(getSystem()) {
775             {
776                 String persistenceId = factory.generateActorId("leader-");
777                 String follower1Id = factory.generateActorId("follower-");
778                 String follower2Id = factory.generateActorId("follower-");
779
780                 ActorRef followerActor1 =
781                         factory.createActor(Props.create(MessageCollectorActor.class), follower1Id);
782                 ActorRef followerActor2 =
783                         factory.createActor(Props.create(MessageCollectorActor.class), follower2Id);
784
785                 DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
786                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
787                 config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
788
789                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
790
791                 Map<String, String> peerAddresses = new HashMap<>();
792                 peerAddresses.put(follower1Id, followerActor1.path().toString());
793                 peerAddresses.put(follower2Id, followerActor2.path().toString());
794
795                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
796                         MockRaftActor.props(persistenceId, peerAddresses,
797                                 Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
798
799                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
800                 leaderActor.getRaftActorContext().setCommitIndex(9);
801                 leaderActor.getRaftActorContext().setLastApplied(9);
802                 leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
803
804                 leaderActor.waitForInitializeBehaviorComplete();
805
806                 Leader leader = new Leader(leaderActor.getRaftActorContext());
807                 leaderActor.setCurrentBehavior(leader);
808                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
809
810                 // create 5 entries in the log
811                 MockRaftActorContext.MockReplicatedLogBuilder logBuilder = new MockRaftActorContext.MockReplicatedLogBuilder();
812                 leaderActor.getRaftActorContext().setReplicatedLog(logBuilder.createEntries(5, 10, 1).build());
813
814                 //set the snapshot index to 4 , 0 to 4 are snapshotted
815                 leaderActor.getRaftActorContext().getReplicatedLog().setSnapshotIndex(4);
816                 //setting replicatedToAllIndex = 9, for the log to clear
817                 leader.setReplicatedToAllIndex(9);
818                 assertEquals(5, leaderActor.getReplicatedLog().size());
819                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
820
821                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
822                 assertEquals(5, leaderActor.getReplicatedLog().size());
823                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
824
825                 // set the 2nd follower nextIndex to 1 which has been snapshotted
826                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
827                 assertEquals(5, leaderActor.getReplicatedLog().size());
828                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
829
830                 // simulate a real snapshot
831                 leaderActor.onReceiveCommand(new SendHeartBeat());
832                 assertEquals(5, leaderActor.getReplicatedLog().size());
833                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
834                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
835                         , RaftState.Leader, leaderActor.getCurrentBehavior().state());
836
837
838                 //reply from a slow follower does not initiate a fake snapshot
839                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
840                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
841
842                 ByteString snapshotBytes = fromObject(Arrays.asList(
843                         new MockRaftActorContext.MockPayload("foo-0"),
844                         new MockRaftActorContext.MockPayload("foo-1"),
845                         new MockRaftActorContext.MockPayload("foo-2"),
846                         new MockRaftActorContext.MockPayload("foo-3"),
847                         new MockRaftActorContext.MockPayload("foo-4")));
848                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
849                 assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
850
851                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
852
853                 //reply from a slow follower after should not raise errors
854                 leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
855                 assertEquals(0, leaderActor.getReplicatedLog().size());
856             }
857         };
858     }
859
860     @Test
861     public void testRealSnapshotWhenReplicatedToAllIndexMinusOne() throws Exception {
862         new JavaTestKit(getSystem()) {{
863             String persistenceId = factory.generateActorId("leader-");
864             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
865             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
866             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
867             config.setSnapshotBatchCount(5);
868
869             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
870
871             Map<String, String> peerAddresses = new HashMap<>();
872
873             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
874                     MockRaftActor.props(persistenceId, peerAddresses,
875                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
876
877             MockRaftActor leaderActor = mockActorRef.underlyingActor();
878             leaderActor.getRaftActorContext().setCommitIndex(3);
879             leaderActor.getRaftActorContext().setLastApplied(3);
880             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
881
882             leaderActor.waitForInitializeBehaviorComplete();
883             for(int i=0;i< 4;i++) {
884                 leaderActor.getReplicatedLog()
885                         .append(new MockRaftActorContext.MockReplicatedLogEntry(1, i,
886                                 new MockRaftActorContext.MockPayload("A")));
887             }
888
889             Leader leader = new Leader(leaderActor.getRaftActorContext());
890             leaderActor.setCurrentBehavior(leader);
891             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
892
893             // Persist another entry (this will cause a CaptureSnapshot to be triggered
894             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
895
896             // Now send a CaptureSnapshotReply
897             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
898
899             // Trimming log in this scenario is a no-op
900             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
901             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
902             assertEquals(-1, leader.getReplicatedToAllIndex());
903
904         }};
905     }
906
907     @Test
908     public void testRealSnapshotWhenReplicatedToAllIndexNotInReplicatedLog() throws Exception {
909         new JavaTestKit(getSystem()) {{
910             String persistenceId = factory.generateActorId("leader-");
911             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
912             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
913             config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
914             config.setSnapshotBatchCount(5);
915
916             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
917
918             Map<String, String> peerAddresses = new HashMap<>();
919
920             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
921                     MockRaftActor.props(persistenceId, peerAddresses,
922                             Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
923
924             MockRaftActor leaderActor = mockActorRef.underlyingActor();
925             leaderActor.getRaftActorContext().setCommitIndex(3);
926             leaderActor.getRaftActorContext().setLastApplied(3);
927             leaderActor.getRaftActorContext().getTermInformation().update(1, persistenceId);
928             leaderActor.getReplicatedLog().setSnapshotIndex(3);
929
930             leaderActor.waitForInitializeBehaviorComplete();
931             Leader leader = new Leader(leaderActor.getRaftActorContext());
932             leaderActor.setCurrentBehavior(leader);
933             leader.setReplicatedToAllIndex(3);
934             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
935
936             // Persist another entry (this will cause a CaptureSnapshot to be triggered
937             leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
938
939             // Now send a CaptureSnapshotReply
940             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
941
942             // Trimming log in this scenario is a no-op
943             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
944             assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
945             assertEquals(3, leader.getReplicatedToAllIndex());
946
947         }};
948     }
949
950     private ByteString fromObject(Object snapshot) throws Exception {
951         ByteArrayOutputStream b = null;
952         ObjectOutputStream o = null;
953         try {
954             b = new ByteArrayOutputStream();
955             o = new ObjectOutputStream(b);
956             o.writeObject(snapshot);
957             byte[] snapshotBytes = b.toByteArray();
958             return ByteString.copyFrom(snapshotBytes);
959         } finally {
960             if (o != null) {
961                 o.flush();
962                 o.close();
963             }
964             if (b != null) {
965                 b.close();
966             }
967         }
968     }
969
970 }