BUG 5656 : Entity ownership candidates not removed consistently on leadership change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNull;
12 import static org.junit.Assert.assertThat;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.AdditionalMatchers.or;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.reset;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
21 import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange;
22 import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
23 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
24 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
25 import akka.actor.ActorRef;
26 import akka.actor.Status.Failure;
27 import akka.actor.Status.Success;
28 import akka.cluster.Cluster;
29 import akka.testkit.JavaTestKit;
30 import com.google.common.base.Optional;
31 import com.google.common.base.Stopwatch;
32 import com.google.common.collect.Iterables;
33 import com.google.common.util.concurrent.Uninterruptibles;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.mockito.ArgumentCaptor;
43 import org.mockito.Mock;
44 import org.mockito.Mockito;
45 import org.mockito.MockitoAnnotations;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
47 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
48 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
49 import org.opendaylight.controller.cluster.datastore.MemberNode;
50 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
51 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
52 import org.opendaylight.controller.cluster.raft.RaftState;
53 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
54 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
55 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
56 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
57 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
58 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
59 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
60 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
61 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
62 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
63 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
65 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
69
70 /**
71  * End-to-end integration tests for the entity ownership functionality.
72  *
73  * @author Thomas Pantelis
74  */
75 public class DistributedEntityOwnershipIntegrationTest {
76     private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
77     private static final String MODULE_SHARDS_5_NODE_CONFIG = "module-shards-default-5-node.conf";
78     private static final String MODULE_SHARDS_MEMBER_1_CONFIG = "module-shards-default-member-1.conf";
79     private static final String ENTITY_TYPE1 = "entityType1";
80     private static final String ENTITY_TYPE2 = "entityType2";
81     private static final Entity ENTITY1 = new Entity(ENTITY_TYPE1, "entity1");
82     private static final Entity ENTITY1_2 = new Entity(ENTITY_TYPE2, "entity1");
83     private static final Entity ENTITY2 = new Entity(ENTITY_TYPE1, "entity2");
84     private static final Entity ENTITY3 = new Entity(ENTITY_TYPE1, "entity3");
85     private static final Entity ENTITY4 = new Entity(ENTITY_TYPE1, "entity4");
86     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
87
88     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
89             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
90                     shardIsolatedLeaderCheckIntervalInMillis(1000000);
91
92     private final DatastoreContext.Builder followerDatastoreContextBuilder =
93             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
94
95     private final List<MemberNode> memberNodes = new ArrayList<>();
96
97     @Mock
98     private EntityOwnershipListener leaderMockListener;
99
100     @Mock
101     private EntityOwnershipListener leaderMockListener2;
102
103     @Mock
104     private EntityOwnershipListener follower1MockListener;
105
106     @Mock
107     private EntityOwnershipListener follower2MockListener;
108
109     @Before
110     public void setUp() {
111         MockitoAnnotations.initMocks(this);
112         InMemoryJournal.clear();
113         InMemorySnapshotStore.clear();
114     }
115
116     @After
117     public void tearDown() {
118         for(MemberNode m: memberNodes) {
119             m.cleanup();
120         }
121     }
122
123     private static DistributedEntityOwnershipService newOwnershipService(final DistributedDataStore datastore) {
124         return DistributedEntityOwnershipService.start(datastore.getActorContext(),
125                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
126     }
127
128     @Test
129     public void testFunctionalityWithThreeNodes() throws Exception {
130         String name = "test";
131         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
132                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
133                 datastoreContextBuilder(leaderDatastoreContextBuilder).build();
134
135         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
136                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
137                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
138
139         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ).
140                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
141                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
142
143         DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
144
145         leaderDistributedDataStore.waitTillReady();
146         follower1Node.configDataStore().waitTillReady();
147         follower2Node.configDataStore().waitTillReady();
148
149         EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
150         EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
151         EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore());
152
153         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
154
155         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
156         leaderEntityOwnershipService.registerListener(ENTITY_TYPE2, leaderMockListener2);
157         follower1EntityOwnershipService.registerListener(ENTITY_TYPE1, follower1MockListener);
158
159         // Register leader candidate for entity1 and verify it becomes owner
160
161         leaderEntityOwnershipService.registerCandidate(ENTITY1);
162         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
163         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
164         reset(leaderMockListener, follower1MockListener);
165
166         verifyGetOwnershipState(leaderEntityOwnershipService, ENTITY1, true, true);
167         verifyGetOwnershipState(follower1EntityOwnershipService, ENTITY1, false, true);
168
169         // Register leader candidate for entity1_2 (same id, different type) and verify it becomes owner
170
171         leaderEntityOwnershipService.registerCandidate(ENTITY1_2);
172         verify(leaderMockListener2, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1_2, false, true, true));
173         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
174         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1_2));
175         reset(leaderMockListener2);
176
177         // Register follower1 candidate for entity1 and verify it gets added but doesn't become owner
178
179         follower1EntityOwnershipService.registerCandidate(ENTITY1);
180         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1", "member-2");
181         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
182         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
183         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
184         verify(follower1MockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
185
186         // Register follower1 candidate for entity2 and verify it becomes owner
187
188         EntityOwnershipCandidateRegistration follower1Entity2Reg = follower1EntityOwnershipService.registerCandidate(ENTITY2);
189         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
190         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
191         reset(leaderMockListener, follower1MockListener);
192
193         // Register follower2 candidate for entity2 and verify it gets added but doesn't become owner
194
195         follower2EntityOwnershipService.registerListener(ENTITY_TYPE1, follower2MockListener);
196         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
197         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
198
199         follower2EntityOwnershipService.registerCandidate(ENTITY2);
200         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-2", "member-3");
201         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-2");
202
203         // Unregister follower1 candidate for entity2 and verify follower2 becomes owner
204
205         follower1Entity2Reg.close();
206         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-3");
207         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-3");
208         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, true));
209         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
210
211         // Depending on timing, follower2MockListener could get ownershipChanged with "false, false, true" if
212         // if the original ownership change with "member-2 is replicated to follower2 after the listener is
213         // registered.
214         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
215         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
216
217         // Register follower1 candidate for entity3 and verify it becomes owner
218
219         follower1EntityOwnershipService.registerCandidate(ENTITY3);
220         verifyOwner(leaderDistributedDataStore, ENTITY3, "member-2");
221         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, true, true));
222         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, false, true));
223         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, false, true));
224
225         // Register follower2 candidate for entity4 and verify it becomes owner
226
227         follower2EntityOwnershipService.registerCandidate(ENTITY4);
228         verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
229         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, true, true));
230         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
231         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
232         reset(follower1MockListener, follower2MockListener);
233
234         // Register follower1 candidate for entity4 and verify it gets added but doesn't become owner
235
236         follower1EntityOwnershipService.registerCandidate(ENTITY4);
237         verifyCandidates(leaderDistributedDataStore, ENTITY4, "member-3", "member-2");
238         verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
239
240         // Shutdown follower2 and verify it's owned entities (entity 2 & 4) get re-assigned
241
242         reset(leaderMockListener, follower1MockListener);
243         follower2Node.cleanup();
244
245         verify(follower1MockListener, timeout(15000).times(2)).ownershipChanged(or(ownershipChange(ENTITY4, false, true, true),
246                 ownershipChange(ENTITY2, false, false, false)));
247         verify(leaderMockListener, timeout(15000).times(2)).ownershipChanged(or(ownershipChange(ENTITY4, false, false, true),
248                 ownershipChange(ENTITY2, false, false, false)));
249         verifyOwner(leaderDistributedDataStore, ENTITY2, ""); // no other candidate
250
251         // Register leader candidate for entity2 and verify it becomes owner
252
253         EntityOwnershipCandidateRegistration leaderEntity2Reg = leaderEntityOwnershipService.registerCandidate(ENTITY2);
254         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
255         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
256
257         // Unregister leader candidate for entity2 and verify the owner is cleared
258
259         leaderEntity2Reg.close();
260         verifyOwner(leaderDistributedDataStore, ENTITY2, "");
261         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, false));
262         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, false));
263     }
264
265     @Test
266     public void testLeaderCandidatesRemovedAfterShutdown() throws Exception {
267         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5).
268                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
269
270         String name = "test";
271         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
272                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
273                 datastoreContextBuilder(leaderDatastoreContextBuilder).build();
274
275         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
276                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
277                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
278
279         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ).
280                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
281                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
282
283         DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
284
285         leaderDistributedDataStore.waitTillReady();
286         follower1Node.configDataStore().waitTillReady();
287         follower2Node.configDataStore().waitTillReady();
288
289         EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
290         EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
291         EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore());
292
293         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
294
295         // Register follower1 candidate for entity1 and verify it becomes owner
296
297         follower1EntityOwnershipService.registerCandidate(ENTITY1);
298         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
299
300         // Register leader candidate for entity1
301
302         leaderEntityOwnershipService.registerCandidate(ENTITY1);
303         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2", "member-1");
304         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
305
306         // Register leader candidate for entity2 and verify it becomes owner
307
308         leaderEntityOwnershipService.registerCandidate(ENTITY2);
309         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
310
311         // Register follower2 candidate for entity2
312
313         follower2EntityOwnershipService.registerCandidate(ENTITY2);
314         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3");
315         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
316
317         // Shutdown the leader and verify its removed from the candidate list
318
319         leaderNode.cleanup();
320         follower1Node.waitForMemberDown("member-1");
321
322         // Re-enable elections on follower1 so it becomes the leader
323
324         ActorRef follower1Shard = IntegrationTestKit.findLocalShard(follower1Node.configDataStore().
325                 getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
326         follower1Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).
327                 customRaftPolicyImplementation(null).build(), ActorRef.noSender());
328
329         MemberNode.verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME,
330             raftState -> assertEquals("Raft state", RaftState.Leader.toString(), raftState.getRaftState()));
331
332         // Verify the prior leader's candidates are removed
333
334         verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2");
335         verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-3");
336         verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
337         verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
338     }
339
340     @Test
341     public void testLeaderAndFollowerCandidatesRemovedAfterShutdown() throws Exception {
342         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5).
343                 customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
344
345         String name = "test";
346         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
347                 moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
348                 datastoreContextBuilder(leaderDatastoreContextBuilder).build();
349
350         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
351                 moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
352                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
353
354         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ).
355                 moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
356                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
357
358         MemberNode follower3Node = MemberNode.builder(memberNodes).akkaConfig("Member4").testName(name ).
359                 moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
360                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
361
362         MemberNode follower4Node = MemberNode.builder(memberNodes).akkaConfig("Member5").testName(name ).
363                 moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
364                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
365
366         DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
367
368         leaderDistributedDataStore.waitTillReady();
369         follower1Node.configDataStore().waitTillReady();
370         follower2Node.configDataStore().waitTillReady();
371
372         EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
373         EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
374         EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore());
375         EntityOwnershipService follower3EntityOwnershipService = newOwnershipService(follower3Node.configDataStore());
376         EntityOwnershipService follower4EntityOwnershipService = newOwnershipService(follower4Node.configDataStore());
377
378         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
379
380         // Register follower1 candidate for entity1 and verify it becomes owner
381
382         follower1EntityOwnershipService.registerCandidate(ENTITY1);
383         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
384
385         // Register leader candidate for entity1
386
387         leaderEntityOwnershipService.registerCandidate(ENTITY1);
388         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2", "member-1");
389         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
390
391         // Register leader candidate for entity2 and verify it becomes owner
392
393         leaderEntityOwnershipService.registerCandidate(ENTITY2);
394         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
395
396         // Register follower2 candidate for entity2
397
398         follower2EntityOwnershipService.registerCandidate(ENTITY2);
399         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3");
400         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
401
402         // Register follower3 as a candidate for entity2 as well
403
404         follower3EntityOwnershipService.registerCandidate(ENTITY2);
405         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3", "member-4");
406         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
407
408
409         // Shutdown the leader and verify its removed from the candidate list
410
411         leaderNode.cleanup();
412         follower3Node.cleanup();
413
414         follower1Node.waitForMemberDown("member-1");
415         follower1Node.waitForMemberDown("member-4");
416
417         // Re-enable elections on follower1 so it becomes the leader
418
419         ActorRef follower1Shard = IntegrationTestKit.findLocalShard(follower1Node.configDataStore().
420                 getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
421         follower1Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).
422                 customRaftPolicyImplementation(null).build(), ActorRef.noSender());
423
424         MemberNode.verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME,
425                 raftState -> assertEquals("Raft state", RaftState.Leader.toString(), raftState.getRaftState()));
426
427         // Verify the prior leader's and follower3 candidates are removed
428
429         verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2");
430         verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-3");
431         verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
432         verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
433     }
434
435     /**
436      * Reproduces bug <a href="https://bugs.opendaylight.org/show_bug.cgi?id=4554">4554</a>
437      *
438      * @throws CandidateAlreadyRegisteredException
439      */
440     @Test
441     public void testCloseCandidateRegistrationInQuickSuccession() throws CandidateAlreadyRegisteredException {
442         String name = "testCloseCandidateRegistrationInQuickSuccession";
443         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
444                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
445                 datastoreContextBuilder(leaderDatastoreContextBuilder).build();
446
447         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
448                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
449                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
450
451         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ).
452                 moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
453                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
454
455         DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
456
457         leaderDistributedDataStore.waitTillReady();
458         follower1Node.configDataStore().waitTillReady();
459         follower2Node.configDataStore().waitTillReady();
460
461         EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
462         EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
463         EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore());
464
465         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
466
467         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
468         follower1EntityOwnershipService.registerListener(ENTITY_TYPE1, follower1MockListener);
469         follower2EntityOwnershipService.registerListener(ENTITY_TYPE1, follower2MockListener);
470
471         final EntityOwnershipCandidateRegistration candidate1 = leaderEntityOwnershipService.registerCandidate(ENTITY1);
472         final EntityOwnershipCandidateRegistration candidate2 = follower1EntityOwnershipService.registerCandidate(ENTITY1);
473         final EntityOwnershipCandidateRegistration candidate3 = follower2EntityOwnershipService.registerCandidate(ENTITY1);
474
475         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
476
477         Mockito.reset(leaderMockListener);
478
479         ArgumentCaptor<EntityOwnershipChange> leaderChangeCaptor = ArgumentCaptor.forClass(EntityOwnershipChange.class);
480         ArgumentCaptor<EntityOwnershipChange> follower1ChangeCaptor = ArgumentCaptor.forClass(EntityOwnershipChange.class);
481         ArgumentCaptor<EntityOwnershipChange> follower2ChangeCaptor = ArgumentCaptor.forClass(EntityOwnershipChange.class);
482         doNothing().when(leaderMockListener).ownershipChanged(leaderChangeCaptor.capture());
483         doNothing().when(follower1MockListener).ownershipChanged(follower1ChangeCaptor.capture());
484         doNothing().when(follower2MockListener).ownershipChanged(follower2ChangeCaptor.capture());
485
486         candidate1.close();
487         candidate2.close();
488         candidate3.close();
489
490         boolean passed = false;
491         for(int i=0;i<100;i++) {
492             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
493             if(!leaderEntityOwnershipService.getOwnershipState(ENTITY1).isPresent() ||
494                     !leaderEntityOwnershipService.getOwnershipState(ENTITY1).get().hasOwner() &&
495                     follower1EntityOwnershipService.getOwnershipState(ENTITY1).isPresent() &&
496                     !follower1EntityOwnershipService.getOwnershipState(ENTITY1).get().hasOwner() &&
497                     follower2EntityOwnershipService.getOwnershipState(ENTITY1).isPresent() &&
498                     !follower2EntityOwnershipService.getOwnershipState(ENTITY1).get().hasOwner() &&
499                     leaderChangeCaptor.getAllValues().size() > 0 && !leaderChangeCaptor.getValue().hasOwner() &&
500                     leaderChangeCaptor.getAllValues().size() > 0 && !follower1ChangeCaptor.getValue().hasOwner() &&
501                     leaderChangeCaptor.getAllValues().size() > 0 && !follower2ChangeCaptor.getValue().hasOwner()) {
502                 passed = true;
503                 break;
504             }
505         }
506
507         assertTrue("No ownership change message was sent with hasOwner=false", passed);
508     }
509
510     /**
511      * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local
512      * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring
513      * an AddShardReplica request to join it to an existing leader.
514      */
515     @Test
516     public void testEntityOwnershipShardBootstrapping() throws Throwable {
517         String name = "testEntityOwnershipShardBootstrapping";
518         String moduleShardsConfig = MODULE_SHARDS_MEMBER_1_CONFIG;
519         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
520                 moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
521                 datastoreContextBuilder(leaderDatastoreContextBuilder).build();
522
523         DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
524         EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
525
526         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
527
528         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
529                 moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
530                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
531
532         DistributedDataStore follower1DistributedDataStore = follower1Node.configDataStore();
533         follower1DistributedDataStore.waitTillReady();
534
535         leaderNode.waitForMembersUp("member-2");
536         follower1Node.waitForMembersUp("member-1");
537
538         EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1DistributedDataStore);
539
540         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
541
542         // Register a candidate for follower1 - should get queued since follower1 has no leader
543         EntityOwnershipCandidateRegistration candidateReg = follower1EntityOwnershipService.registerCandidate(ENTITY1);
544         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
545         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
546
547         // Add replica in follower1
548         AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME);
549         follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1Node.kit().getRef());
550         Object reply = follower1Node.kit().expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class);
551         if(reply instanceof Failure) {
552             throw new AssertionError("AddShardReplica failed", ((Failure)reply).cause());
553         }
554
555         // The queued candidate registration should proceed
556         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
557         reset(leaderMockListener);
558
559         candidateReg.close();
560         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false));
561         reset(leaderMockListener);
562
563         // Restart follower1 and verify the entity ownership shard is re-instated by registering.
564         Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress());
565         follower1Node.cleanup();
566
567         follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
568                 moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
569                 datastoreContextBuilder(followerDatastoreContextBuilder).build();
570         follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
571
572         follower1EntityOwnershipService.registerCandidate(ENTITY1);
573         verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
574
575         verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, raftState -> {
576             assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName());
577             assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size());
578             assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()),
579                     org.hamcrest.CoreMatchers.containsString("member-1"));
580         });
581     }
582
583     private static void verifyGetOwnershipState(final EntityOwnershipService service, final Entity entity,
584             final boolean isOwner, final boolean hasOwner) {
585         Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
586         assertEquals("getOwnershipState present", true, state.isPresent());
587         assertEquals("isOwner", isOwner, state.get().isOwner());
588         assertEquals("hasOwner", hasOwner, state.get().hasOwner());
589     }
590
591     private static void verifyCandidates(final DistributedDataStore dataStore, final Entity entity, final String... expCandidates) throws Exception {
592         AssertionError lastError = null;
593         Stopwatch sw = Stopwatch.createStarted();
594         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 10000) {
595             Optional<NormalizedNode<?, ?>> possible = dataStore.newReadOnlyTransaction().read(
596                     entityPath(entity.getType(), entity.getId()).node(Candidate.QNAME)).get(5, TimeUnit.SECONDS);
597             try {
598                 assertEquals("Candidates not found for " + entity, true, possible.isPresent());
599                 Collection<String> actual = new ArrayList<>();
600                 for(MapEntryNode candidate: ((MapNode)possible.get()).getValue()) {
601                     actual.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
602                 }
603
604                 assertEquals("Candidates for " + entity, Arrays.asList(expCandidates), actual);
605                 return;
606             } catch (AssertionError e) {
607                 lastError = e;
608                 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
609             }
610         }
611
612         throw lastError;
613     }
614
615     private static void verifyOwner(final DistributedDataStore dataStore, final Entity entity, final String expOwner) {
616         AbstractEntityOwnershipTest.verifyOwner(expOwner, entity.getType(), entity.getId(),
617                 path -> {
618                     try {
619                         return dataStore.newReadOnlyTransaction().read(path).get(5, TimeUnit.SECONDS).get();
620                     } catch (Exception e) {
621                         return null;
622                     }
623                 });
624     }
625 }