- downPeerMemberNames.remove(message.getNewCandidate());
-
- String currentOwner = getCurrentOwner(message.getEntityPath());
- if(Strings.isNullOrEmpty(currentOwner)){
- EntityOwnerSelectionStrategy entityOwnerSelectionStrategy
- = getEntityOwnerElectionStrategy(message.getEntityPath());
- if(entityOwnerSelectionStrategy.selectionDelayInMillis() == 0L) {
- writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(),
- entityOwnerSelectionStrategy));
- } else {
- throw new UnsupportedOperationException("Delayed selection not implemented yet");
- }
+ downPeerMemberNames.remove(MemberName.forName(message.getNewCandidate()));
+
+ final String currentOwner = getCurrentOwner(message.getEntityPath());
+ final EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath());
+
+ // Available members is all the known peers - the number of peers that are down + self
+ // So if there are 2 peers and 1 is down then availableMembers will be 2
+ final int availableMembers = getRaftActorContext().getPeerIds().size() - downPeerMemberNames.size() + 1;
+
+ LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
+
+ if (strategy.getSelectionDelayInMillis() == 0L) {
+ writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
+ strategy));
+ } else if (message.getAllCandidates().size() == availableMembers) {
+ LOG.debug("{}: Received the maximum candidates requests : {} writing new owner",
+ persistenceId(), availableMembers);
+ cancelOwnerSelectionTask(message.getEntityPath());
+ writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
+ strategy));
+ } else {
+ scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);