Bug 4105: Change ownership on member down/up 04/26804/1
authorTom Pantelis <tpanteli@brocade.com>
Sat, 15 Aug 2015 21:23:50 +0000 (17:23 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 19:19:47 +0000 (15:19 -0400)
Added 2 new messages, PeerUp and PeerDown, that the ShardManager sends
in response to cluster member events.

For PeerDown, the EntityOwnershipShard finds the entities owned by the
down member and selects a new owner based on the remaining candidates.
If there's no other candidates, the owner is cleared (set to "") so new
candidates can become owner. The down members are also tracked via a
downPeerMemberNames set.

For PeerUp, if the up member is in the downPeerMemberNames, the
EntityOwnershipShard finds entities that previously had their owner
cleared and attempts to select a new owner. This handles the case where
a previously down member was the only candidate for an entity so, when
that member comes back up, the entity's owner will be re-assigned to
that member.

Reassigning of owners via PeerDown and PeerUp is only done on the
leader. However that may not handle the case where the leader goes down.
When a new leader is elected we need it to select new owners for
entities owned by the down leader. There are 2 cases here. If the old
leader has not yet been detected as down then eventually we expect to
get PeerDown to handle it. The second case is if PeerDown was already
received prior to the leader change (probably the norm), in which case
PeerDown would not have been processed. To handle this case I overrode
onLeaderChanged to select new owners for entities owned by the old leader
that is passed in. The RaftActor sends the old leader's peerId so I
added a peerIdToMemberNames map to translate - this is populated via
PeerUp. Also I changed the RaftActor to track and pass the actual last valid
leader id, previously it passed the leader id from the previous behavior
which would normally be Candidate which always has a null leaderId.

The newOwner method was changed to ignore candidates in the
downPeerMemberNames set as there's no point in assigning the owner to a
candidate known to be down.

Change-Id: I8f0b78460a1a3e2a6418431f8a8a770a789e8f8d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
13 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerDown.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerUp.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/entity-owners.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties

index e9e0d63..4e0f770 100644 (file)
@@ -323,19 +323,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             onStateChanged();
         }
 
-        String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
+        String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId();
         String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
 
         // it can happen that the state has not changed but the leader has changed.
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId()) ||
+        if(!Objects.equal(lastValidLeaderId, currentBehavior.getLeaderId()) ||
            oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
             if(roleChangeNotifier.isPresent()) {
                 roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
                         currentBehavior.getLeaderPayloadVersion()), getSelf());
             }
 
-            onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+            onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
         }
 
         if (roleChangeNotifier.isPresent() &&
@@ -670,21 +670,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
-        private String leaderId;
+        private String lastValidLeaderId;
         private short leaderPayloadVersion;
 
         void init(RaftActorBehavior behavior) {
             this.behavior = behavior;
-            this.leaderId = behavior != null ? behavior.getLeaderId() : null;
             this.leaderPayloadVersion = behavior != null ? behavior.getLeaderPayloadVersion() : -1;
+
+            String behaviorLeaderId = behavior != null ? behavior.getLeaderId() : null;
+            if(behaviorLeaderId != null) {
+                this.lastValidLeaderId = behaviorLeaderId;
+            }
         }
 
         RaftActorBehavior getBehavior() {
             return behavior;
         }
 
-        String getLeaderId() {
-            return leaderId;
+        String getLastValidLeaderId() {
+            return lastValidLeaderId;
         }
 
         short getLeaderPayloadVersion() {
index c24156f..f8237b5 100644 (file)
@@ -55,6 +55,8 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardF
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
+import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
@@ -176,6 +178,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
+        } else if (message instanceof ClusterEvent.MemberExited){
+            memberExited((ClusterEvent.MemberExited) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
             memberRemoved((ClusterEvent.MemberRemoved) message);
         } else if(message instanceof ClusterEvent.UnreachableMember) {
@@ -213,7 +217,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), createShard.getShardName());
             Map<String, String> peerAddresses = getPeerAddresses(createShard.getShardName(), createShard.getMemberNames());
 
-            LOG.debug("onCreateShard: shardId: {}, peerAddresses: {}", shardId, peerAddresses);
+            LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+                    createShard.getMemberNames(), peerAddresses);
 
             DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
             if(shardDatastoreContext == null) {
@@ -464,7 +469,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        memberNameToAddress.remove(message.member().roles().head());
+        memberNameToAddress.remove(memberName);
+
+        for(ShardInformation info : localShards.values()){
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
+    }
+
+    private void memberExited(ClusterEvent.MemberExited message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
+        memberNameToAddress.remove(memberName);
+
+        for(ShardInformation info : localShards.values()){
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+        }
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
@@ -477,8 +499,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
-            info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
-                getShardActorPath(shardName, memberName), getSelf());
+            String peerId = getShardIdentifier(memberName, shardName).toString();
+            info.updatePeerAddress(peerId, getShardActorPath(shardName, memberName), getSelf());
+
+            info.peerUp(memberName, peerId, getSelf());
         }
 
         checkReady();
@@ -507,6 +531,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 primaryShardInfoCache.remove(info.getShardName());
             }
+
+            info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
@@ -517,6 +543,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 LOG.debug("Marking Leader {} as available.", leaderId);
                 info.setLeaderAvailable(true);
             }
+
+            info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
         }
     }
 
@@ -694,7 +722,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         for(String memberName : members) {
             if(!currentMemberName.equals(memberName)) {
                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
-                String path = getShardActorPath(shardName, currentMemberName);
+                String path = getShardActorPath(shardName, memberName);
                 peerAddresses.put(shardId.toString(), path);
             }
         }
@@ -808,13 +836,25 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                                 peerId, peerAddress, actor.path());
                     }
 
-                    actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
+                    actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
                 }
 
                 notifyOnShardInitializedCallbacks();
             }
         }
 
+        void peerDown(String memberName, String peerId, ActorRef sender) {
+            if(peerAddresses.containsKey(peerId) && actor != null) {
+                actor.tell(new PeerDown(memberName, peerId), sender);
+            }
+        }
+
+        void peerUp(String memberName, String peerId, ActorRef sender) {
+            if(peerAddresses.containsKey(peerId) && actor != null) {
+                actor.tell(new PeerUp(memberName, peerId), sender);
+            }
+        }
+
         boolean isShardReady() {
             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
         }
index d6dd9a4..c18a197 100644 (file)
@@ -29,7 +29,7 @@ import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipC
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityOwner;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -57,7 +57,7 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
 
         Configuration configuration = datastore.getActorContext().getConfiguration();
         Collection<String> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
-        configuration.addModuleShardConfiguration(new ModuleShardConfiguration(EntityOwner.QNAME.getNamespace(),
+        configuration.addModuleShardConfiguration(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
                 "entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames));
 
         CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
index 46f3358..59d2844 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableOrderedMapNodeBuilder;
 
 /**
  * Utility methods for entity-owners yang model.
@@ -35,7 +36,12 @@ final class EntityOwnersModel {
 
     static final NodeIdentifier ENTITY_OWNERS_NODE_ID = new NodeIdentifier(EntityOwners.QNAME);
     static final NodeIdentifier ENTITY_OWNER_NODE_ID = new NodeIdentifier(ENTITY_OWNER_QNAME);
+    static final NodeIdentifier ENTITY_NODE_ID = new NodeIdentifier(ENTITY_QNAME);
+    static final NodeIdentifier CANDIDATE_NODE_ID = new NodeIdentifier(Candidate.QNAME);
+    static final NodeIdentifier CANDIDATE_NAME_NODE_ID = new NodeIdentifier(CANDIDATE_NAME_QNAME);
     static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME);
+    static final YangInstanceIdentifier ENTITY_TYPES_PATH =
+            YangInstanceIdentifier.of(EntityOwners.QNAME).node(EntityType.QNAME);
 
     static YangInstanceIdentifier entityPath(String entityType, YangInstanceIdentifier entityId) {
         return YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).node(EntityType.QNAME).
@@ -77,8 +83,8 @@ final class EntityOwnersModel {
     }
 
     static MapNode candidateEntry(String candidateName) {
-        return ImmutableNodes.mapNodeBuilder(Candidate.QNAME).addChild(ImmutableNodes.mapEntry(
-                Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName)).build();
+        return ImmutableOrderedMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(Candidate.QNAME)).
+                addChild(ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME_QNAME, candidateName)).build();
     }
 
     static NormalizedNode<?, ?> entityEntryWithOwner(YangInstanceIdentifier entityId, String owner) {
index b1e63ae..a71c86d 100644 (file)
@@ -7,9 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
@@ -18,8 +22,12 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import com.google.common.base.Optional;
 import com.google.common.base.Strings;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.Shard;
@@ -29,12 +37,18 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Re
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
+import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
@@ -50,6 +64,8 @@ class EntityOwnershipShard extends Shard {
     private final String localMemberName;
     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
     private final EntityOwnershipListenerSupport listenerSupport;
+    private final Set<String> downPeerMemberNames = new HashSet<>();
+    private final Map<String, String> peerIdToMemberNames = new HashMap<>();
 
     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
@@ -61,6 +77,11 @@ class EntityOwnershipShard extends Shard {
         this.localMemberName = localMemberName;
         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
         this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
+
+        for(String peerId: peerAddresses.keySet()) {
+            ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
+            peerIdToMemberNames.put(peerId, shardId.getMemberName());
+        }
     }
 
     @Override
@@ -86,13 +107,17 @@ class EntityOwnershipShard extends Shard {
             onCandidateAdded((CandidateAdded) message);
         } else if(message instanceof CandidateRemoved){
             onCandidateRemoved((CandidateRemoved) message);
+        } else if(message instanceof PeerDown) {
+            onPeerDown((PeerDown) message);
+        } else if(message instanceof PeerUp) {
+            onPeerUp((PeerUp) message);
         } else if(!commitCoordinator.handleMessage(message, this)) {
             super.onReceiveCommand(message);
         }
     }
 
     private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
-        LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
+        LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
 
         listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
 
@@ -104,7 +129,7 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
-        LOG.debug("onUnregisterCandidateLocal: {}", unregisterCandidate);
+        LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
 
         Entity entity = unregisterCandidate.getEntity();
         listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
@@ -117,7 +142,7 @@ class EntityOwnershipShard extends Shard {
 
     void tryCommitModifications(final BatchedModifications modifications) {
         if(isLeader()) {
-            LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+            LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
 
             // Note that it's possible the commit won't get consensus and will timeout and not be applied
             // to the state. However we don't need to retry it in that case b/c it will be committed to
@@ -127,7 +152,10 @@ class EntityOwnershipShard extends Shard {
         } else {
             final ActorSelection leader = getLeader();
             if (leader != null) {
-                LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(),
+                            modifications.getTransactionID(), leader);
+                }
 
                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
@@ -148,12 +176,33 @@ class EntityOwnershipShard extends Shard {
         commitCoordinator.onStateChanged(this, isLeader());
     }
 
+    @Override
+    protected void onLeaderChanged(String oldLeader, String newLeader) {
+        super.onLeaderChanged(oldLeader, newLeader);
+
+        LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
+                newLeader, isLeader());
+
+        if(isLeader()) {
+            // We were just elected leader. If the old leader is down, select new owners for the entities
+            // owned by the down leader.
+
+            String oldLeaderMemberName = peerIdToMemberNames.get(oldLeader);
+
+            LOG.debug("{}: oldLeaderMemberName: {}", persistenceId(), oldLeaderMemberName);
+
+            if(downPeerMemberNames.contains(oldLeaderMemberName)) {
+                selectNewOwnerForEntitiesOwnedBy(oldLeaderMemberName);
+            }
+        }
+    }
+
     private void onCandidateRemoved(CandidateRemoved message) {
-        if(!isLeader()){
+        if(!isLeader()) {
             return;
         }
 
-        LOG.debug("onCandidateRemoved: {}", message);
+        LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
 
         String currentOwner = getCurrentOwner(message.getEntityPath());
         if(message.getRemovedCandidate().equals(currentOwner)){
@@ -166,7 +215,7 @@ class EntityOwnershipShard extends Shard {
             return;
         }
 
-        LOG.debug("onCandidateAdded: {}", message);
+        LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
 
         String currentOwner = getCurrentOwner(message.getEntityPath());
         if(Strings.isNullOrEmpty(currentOwner)){
@@ -174,16 +223,88 @@ class EntityOwnershipShard extends Shard {
         }
     }
 
+    private void onPeerDown(PeerDown peerDown) {
+        LOG.debug("{}: onPeerDown: {}", persistenceId(), peerDown);
+
+        String downMemberName = peerDown.getMemberName();
+        if(downPeerMemberNames.add(downMemberName) && isLeader()) {
+            // Select new owners for entities owned by the down peer.
+            selectNewOwnerForEntitiesOwnedBy(downMemberName);
+        }
+    }
+
+    private void onPeerUp(PeerUp peerUp) {
+        LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
+
+        peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
+
+        if(downPeerMemberNames.remove(peerUp.getMemberName()) && isLeader()) {
+            // This peer was previously down - for its previously owned entities, if there were no other
+            // candidates, the owner would have been cleared so handle that here by trying to re-assign
+            // ownership for entities whose owner is cleared.
+            selectNewOwnerForEntitiesOwnedBy("");
+        }
+    }
+
+    private void selectNewOwnerForEntitiesOwnedBy(String owner) {
+        DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+        Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
+        if(!possibleEntityTypes.isPresent()) {
+            return;
+        }
+
+        LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
+
+        BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
+            Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
+                    entityType.getChild(ENTITY_NODE_ID);
+            if(!possibleEntities.isPresent()) {
+                continue; // shouldn't happen but handle anyway
+            }
+
+            for(MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
+                Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+                        entity.getChild(ENTITY_OWNER_NODE_ID);
+                if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+                    Object newOwner = newOwner(getCandidateNames(entity));
+                    YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+                            node(entityType.getIdentifier()).node(ENTITY_NODE_ID).node(entity.getIdentifier()).
+                                    node(ENTITY_OWNER_NODE_ID).build();
+
+                    LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+                    modifications.addModification(new WriteModification(entityPath,
+                            ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+                }
+            }
+        }
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
+    private Collection<String> getCandidateNames(MapEntryNode entity) {
+        Collection<MapEntryNode> candidates = ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getValue();
+        Collection<String> candidateNames = new ArrayList<>(candidates.size());
+        for(MapEntryNode candidate: candidates) {
+            candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
+        }
+
+        return candidateNames;
+    }
+
     private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
-        LOG.debug("Writing new owner {} for entity {}", newOwner, entityPath);
+        LOG.debug("{}: Writing new owner {} for entity {}", persistenceId(), newOwner, entityPath);
 
         commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
                 ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
     }
 
     private String newOwner(Collection<String> candidates) {
-        if(candidates.size() > 0){
-            return candidates.iterator().next();
+        for(String candidate: candidates) {
+            if(!downPeerMemberNames.contains(candidate)) {
+                return candidate;
+            }
         }
 
         return "";
index 6c15ef6..172849c 100644 (file)
@@ -140,18 +140,26 @@ class EntityOwnershipShardCommitCoordinator {
     }
 
     void commitModification(Modification modification, EntityOwnershipShard shard) {
+        BatchedModifications modifications = newBatchedModifications();
+        modifications.addModification(modification);
+        commitModifications(modifications, shard);
+    }
+
+    void commitModifications(BatchedModifications modifications, EntityOwnershipShard shard) {
+        if(modifications.getModifications().isEmpty()) {
+            return;
+        }
+
         boolean hasLeader = shard.hasLeader();
         if(inflightCommit != null || !hasLeader) {
             if(log.isDebugEnabled()) {
-                log.debug("{} - adding modification to pending",
+                log.debug("{} - adding modifications to pending",
                         (inflightCommit != null ? "A commit is inflight" : "No shard leader"));
             }
 
-            pendingModifications.add(modification);
+            pendingModifications.addAll(modifications.getModifications());
         } else {
-            inflightCommit = newBatchedModifications();
-            inflightCommit.addModification(modification);
-
+            inflightCommit = modifications;
             shard.tryCommitModifications(inflightCommit);
         }
     }
@@ -179,7 +187,7 @@ class EntityOwnershipShardCommitCoordinator {
         inflightCommit = newBatchedModifications;
     }
 
-    private BatchedModifications newBatchedModifications() {
+    BatchedModifications newBatchedModifications() {
         BatchedModifications modifications = new BatchedModifications(
                 TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
                 DataStoreVersions.CURRENT_VERSION, "");
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerDown.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerDown.java
new file mode 100644 (file)
index 0000000..c6254d1
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to a shard actor indicating one of its peers is down.
+ *
+ * @author Thomas Pantelis
+ */
+public class PeerDown {
+    private final String memberName;
+    private final String peerId;
+
+    public PeerDown(String memberName, String peerId) {
+        this.memberName = memberName;
+        this.peerId = peerId;
+    }
+
+    public String getMemberName() {
+        return memberName;
+    }
+
+
+    public String getPeerId() {
+        return peerId;
+    }
+
+    @Override
+    public String toString() {
+        return "PeerDown [memberName=" + memberName + ", peerId=" + peerId + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerUp.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerUp.java
new file mode 100644 (file)
index 0000000..9e197ac
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Message sent to a shard actor indicating one of its peers is up.
+ *
+ * @author Thomas Pantelis
+ */
+public class PeerUp {
+    private final String memberName;
+    private final String peerId;
+
+    public PeerUp(String memberName, String peerId) {
+        this.memberName = memberName;
+        this.peerId = peerId;
+    }
+
+    public String getMemberName() {
+        return memberName;
+    }
+
+
+    public String getPeerId() {
+        return peerId;
+    }
+
+    @Override
+    public String toString() {
+        return "PeerUp [memberName=" + memberName + ", peerId=" + peerId + "]";
+    }
+}
\ No newline at end of file
index 24fe7a3..0f37e13 100644 (file)
@@ -14,31 +14,6 @@ module entity-owners {
 
     container entity-owners {
 
-        // The entity-owner list contains a list of all entities which have been assigned an owner
-        // Since an owner will be a cluster member it gives us a quick way to figure out all the entities
-        // that will need to have new owners assigned on the failure of a given cluster member
-        list entity-owner {
-            key name;
-            leaf name {
-                type string;
-            }
-
-            // Group all entities of a given type together
-            list entity-type {
-                key type;
-                leaf type {
-                    type string;
-                }
-
-                list entity {
-                    key id;
-                    leaf id {
-                        type string;
-                    }
-                }
-            }
-        }
-
         // A list of all entities grouped by type
         list entity-type {
             key type;
@@ -48,6 +23,7 @@ module entity-owners {
 
             list entity {
                 key id;
+
                 leaf id {
                     type instance-identifier;
                 }
@@ -55,10 +31,12 @@ module entity-owners {
                 leaf owner {
                     type string;
                 }
-                
+
                 // A list of all the candidates that would like to own the entity
                 list candidate {
                     key name;
+                    ordered-by user;
+
                     leaf name {
                         type string;
                     }
index bd0dc4d..1ffa679 100644 (file)
@@ -61,12 +61,13 @@ import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
+import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
@@ -77,6 +78,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -521,6 +523,15 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager1.underlyingActor().waitForUnreachableMember();
 
+            PeerDown peerDown = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
+            assertEquals("getMemberName", "member-2", peerDown.getMemberName());
+            MessageCollectorActor.clearMessages(mockShardActor1);
+
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                    createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
+
             shardManager1.tell(new FindPrimary("default", true), getRef());
 
             expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
@@ -530,12 +541,21 @@ public class ShardManagerTest extends AbstractActorTest {
 
             shardManager1.underlyingActor().waitForReachableMember();
 
+            PeerUp peerUp = MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
+            assertEquals("getMemberName", "member-2", peerUp.getMemberName());
+            MessageCollectorActor.clearMessages(mockShardActor1);
+
             shardManager1.tell(new FindPrimary("default", true), getRef());
 
             RemotePrimaryShardFound found1 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
             String path1 = found1.getPrimaryPath();
             assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
 
+            shardManager1.underlyingActor().onReceiveCommand(MockClusterWrapper.
+                    createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"));
+
+            MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
+
         }};
 
         JavaTestKit.shutdownActorSystem(system1);
index ec2006d..8ba17c0 100644 (file)
@@ -19,9 +19,12 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.actor.UntypedActor;
 import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableMap;
@@ -46,11 +49,15 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Un
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
+import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
@@ -74,13 +81,14 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
     private static final YangInstanceIdentifier ENTITY_ID2 =
             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
+    private static final YangInstanceIdentifier ENTITY_ID3 =
+            YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
+    private static final YangInstanceIdentifier ENTITY_ID4 =
+            YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
     private static final String LOCAL_MEMBER_NAME = "member-1";
 
-    private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
-            .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
-
     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
@@ -115,8 +123,8 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
 
         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
 
-        String peerId = actorFactory.generateActorId("follower");
-        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
+        String peerId = newShardId("follower").toString();
+        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
 
         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
@@ -147,12 +155,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
                 shardTransactionCommitTimeoutInSeconds(1);
 
-        String peerId = actorFactory.generateActorId("follower");
+        String peerId = newShardId("follower").toString();
         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
 
         MockFollower follower = peer.underlyingActor();
-        follower.grantVote = true;
 
         // Drop AppendEntries so consensus isn't reached.
         follower.dropAppendEntries = true;
@@ -191,12 +198,11 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
                 shardIsolatedLeaderCheckIntervalInMillis(50);
 
-        String peerId = actorFactory.generateActorId("follower");
+        String peerId = newShardId("follower").toString();
         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
 
         MockFollower follower = peer.underlyingActor();
-        follower.grantVote = true;
 
         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
@@ -231,7 +237,7 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
                 shardBatchedModificationCount(5);
 
-        String peerId = actorFactory.generateActorId("leader");
+        String peerId = newShardId("leader").toString();
         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
 
@@ -411,6 +417,189 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
     }
 
+    @Test
+    public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000);
+
+        String peerMemberName1 = "peerMember1";
+        String peerMemberName2 = "peerMember2";
+
+        ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+        ShardIdentifier peerId1 = newShardId(peerMemberName1);
+        ShardIdentifier peerId2 = newShardId(peerMemberName2);
+
+        TestActorRef<EntityOwnershipShard> peer1 = actorFactory.createTestActor(newShardProps(peerId1,
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
+                        peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
+
+        TestActorRef<EntityOwnershipShard> peer2 = actorFactory.createTestActor(newShardProps(peerId2,
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
+                        peerMemberName2). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
+
+        TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
+                ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
+                        put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME).
+                withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
+        leader.tell(new ElectionTimeout(), leader);
+
+        kit.waitUntilLeader(leader);
+
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        // Send PeerDown and PeerUp with no entities
+
+        leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+        leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+
+        // Add candidates for entity1 with the local leader as the owner
+
+        leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        // Add candidates for entity2 with peerMember2 as the owner
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+
+        leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+
+        // Add candidates for entity3 with peerMember2 as the owner.
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+
+        leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+
+        // Add only candidate peerMember2 for entity4.
+
+        commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+
+        // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
+        // owner selected
+
+        kit.watch(peer2);
+        peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
+        kit.unwatch(peer2);
+
+        leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+        // Send PeerDown again - should be noop
+        leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+        peer1.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // no other candidates so should clear
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+
+        // Reinstate peerMember2 - should become owner again for entity 4
+
+        peer2 = actorFactory.createTestActor(newShardProps(peerId2,
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
+                        peerMemberName2). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
+        leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+        // Send PeerUp again - should be noop
+        leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+        peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
+
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
+
+        peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        leader.tell(new PeerDown(peerMemberName1, peerId1.toString()), ActorRef.noSender());
+
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+
+        // Verify the reinstated peerMember2 is fully synced.
+
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        // Reinstate peerMember1 and verify no owner changes
+
+        peer1 = actorFactory.createTestActor(newShardProps(peerId1,
+                ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
+                        peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
+        leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
+
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        // Verify the reinstated peerMember1 is fully synced.
+
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
+        // the entities (1 and 3) previously owned by the local leader member.
+
+        peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
+        peer2.tell(new PeerUp(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
+        peer2.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
+
+        leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        peer2.tell(new PeerDown(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
+        peer2.tell(new ElectionTimeout(), peer2);
+
+        kit.waitUntilLeader(peer2);
+
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+        verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+    }
+
+    private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
+            JavaTestKit sender) {
+        BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
+        modifications.setDoCommitOnReady(true);
+        modifications.setReady(true);
+        modifications.setTotalMessagesSent(1);
+        modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
+
+        shard.tell(modifications, sender.getRef());
+        sender.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+    }
+
     private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
             YangInstanceIdentifier entityId, String candidateName) {
         verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
@@ -472,8 +661,17 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
     }
 
     private Props newShardProps(Map<String,String> peers) {
-        return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
-                LOCAL_MEMBER_NAME);
+        return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME);
+    }
+
+    private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
+        return EntityOwnershipShard.props(shardId, peers, dataStoreContextBuilder.build(),
+                SCHEMA_CONTEXT, memberName);
+    }
+
+    private ShardIdentifier newShardId(String memberName) {
+        return ShardIdentifier.builder().memberName(memberName).shardName("entity-ownership").
+                type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
     }
 
     public static class MockFollower extends UntypedActor {
@@ -482,7 +680,12 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         private final String myId;
 
         public MockFollower(String myId) {
+            this(myId, true);
+        }
+
+        public MockFollower(String myId, boolean grantVote) {
             this.myId = myId;
+            this.grantVote = grantVote;
         }
 
         @Override
index 756ed07..dbbc2f9 100644 (file)
@@ -57,7 +57,7 @@ public class MockClusterWrapper implements ClusterWrapper{
         to.tell(createMemberRemoved(memberName, address), null);
     }
 
-    private static ClusterEvent.MemberRemoved createMemberRemoved(String memberName, String address) {
+    public static ClusterEvent.MemberRemoved createMemberRemoved(String memberName, String address) {
         akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
             AddressFromURIString.parse(address), 55);
 
index 3dd752e..db271ba 100644 (file)
@@ -3,4 +3,5 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
 org.slf4j.simpleLogger.logFile=System.out
 org.slf4j.simpleLogger.showShortLogName=true
 org.slf4j.simpleLogger.levelInBrackets=true
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.