Bug 2187: Bootstrap EOS shard when no local shards configured 76/29576/12
authorTom Pantelis <tpanteli@brocade.com>
Thu, 12 Nov 2015 08:47:27 +0000 (03:47 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 17 Nov 2015 00:41:47 +0000 (00:41 +0000)
The intended workflow to initially form a cluster dynamically is to
change the role for a second node to say member-2. Since the initial
static shard config is bootstrapped to member-1, no local shards will be
created. However, the entity-ownership shard is special in that it is
intended to exist on every node.

The EOS will be boostrapped as follows:

For the EOS CreateShard message, all unique members for all shards are
obtained from the static shard config. It assumes the local member is
present in the config however in the above workflow it won't be. So on EOS
CreateShard, if the local member isn’t in the initial member list then
it will create the local shard with an empty peer list and
DisableElectionsRaftPolicy so it stays as follower. Also the shard will
be flagged as inactive in the ShardManager. A subsequent
AddShardReplica will be needed to make it active.

The other option is to not create EOS shard but there may be initial
candidate registrations which would be missed unless we add retry logic
in the service class. But the EOS shard already has retry logic so it
would be ideal to leverage it.

I also made changes to the AddShardReplica logic to handle an existing
local shard as will occur for the EOS shard:

 - remove the failure reply if local shard already exists
 - if the local shard exists and the primary shard is the local shard,
   do nothing and return AlreadyExistsException failure reply
 - otherwise send AddServer to the primary
 - on FindPrimary, if the local shard exists but is not active, do a
   remote find as if the local shard doesn't exist
 - on AddServer, if the new server is already in the peer list, the
   ALREADY_EXISTS status is returned. Return AlreadyExistsException
   failure reply
 - on AddServer failure, if the local shard was pre-existing don't
   remove it.

We still want to prevent an AddShardReplica request which one is already
in progress so I added a Set to track this.

I added an integration test for bootstrapping the EOS shard. It starts
with an inactive shard and registers a candidate, which gets queued
since there's no leader. It then issues AddShardReplica and verifies the
candidate gets registered with the leader.

To get this to work required some teaks in the RaftActor and Follower.
When the ShardManager clears the DisableElectionsRaftPolicy, the
RaftActor creates a new Follower instance however it loses the previous
leader Id. If the new server config hasn't been replicated yet then it
has no peers and immediately tries to start an election. Since it has no
peers it goes to Leader wih no followers creating a 2 leader situation.
To alleviate this I transferred the previous leader Id to the new
Follower instance to prevent the immediate election.

Eeven after that the test still didn't work b/c the leader was still not
in the EOS shard peer list so lookup of the leader address returned
null. So I changed getPeerAddress in the RaftActorContext to lookup in
the resolver if no peer info exists.

I also added more units for AddShardReplica to increase code coverage.

Change-Id: Id2a12ae226af69611d5ca5155f5f018cef82dff4
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorContextImplTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/AlreadyExistsException.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShardReply.java with 55% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-member-1.conf [new file with mode: 0644]

index 5f6f3ec24e573c4dedba6ba42a38dbbe28a51698..b93ea4dd8a8b3e6028d2943f87c84d17ee3eb01a 100644 (file)
@@ -502,12 +502,23 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         String newRaftPolicy = configParams.
             getCustomRaftPolicyImplementationClass();
 
         String newRaftPolicy = configParams.
             getCustomRaftPolicyImplementationClass();
 
-        LOG.debug ("RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}",
+        LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
             oldRaftPolicy, newRaftPolicy);
         context.setConfigParams(configParams);
         if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
             oldRaftPolicy, newRaftPolicy);
         context.setConfigParams(configParams);
         if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
-            //RaftPolicy is modifed for the Actor. Re-initialize its current behaviour
-            initializeBehavior();
+            // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
+            // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
+            // avoids potential disruption. Otherwise, switch to Follower normally.
+            RaftActorBehavior behavior = currentBehavior.getDelegate();
+            if(behavior instanceof Follower) {
+                String previousLeaderId = ((Follower)behavior).getLeaderId();
+
+                LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+
+                changeCurrentBehavior(new Follower(context, previousLeaderId));
+            } else {
+                initializeBehavior();
+            }
         }
     }
 
         }
     }
 
index 66059b5d62c1b756517cb691ab53f938b42006b0..a96d5026639c693c7164616040c332c72b640b8d 100644 (file)
@@ -13,7 +13,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.ArrayList;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.ArrayList;
@@ -24,8 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
-import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
 
 public class RaftActorContextImpl implements RaftActorContext {
 import org.slf4j.Logger;
 
 public class RaftActorContextImpl implements RaftActorContext {
@@ -181,6 +180,8 @@ public class RaftActorContextImpl implements RaftActorContext {
                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
                 peerInfo.setAddress(peerAddress);
             }
                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
                 peerInfo.setAddress(peerAddress);
             }
+        } else {
+            peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
         }
 
         return peerAddress;
         }
 
         return peerAddress;
index 3b1e69d0ac5c3270c0bdf761b653c974efe9de7e..0149b57e6e94f8ddf076a406f14eea1546d2aa1e 100644 (file)
@@ -45,12 +45,17 @@ public class Follower extends AbstractRaftActorBehavior {
     private static final int SYNC_THRESHOLD = 10;
 
     public Follower(RaftActorContext context) {
     private static final int SYNC_THRESHOLD = 10;
 
     public Follower(RaftActorContext context) {
+        this(context, null);
+    }
+
+    public Follower(RaftActorContext context, String initialLeaderId) {
         super(context, RaftState.Follower);
         super(context, RaftState.Follower);
+        leaderId = initialLeaderId;
 
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
 
         if(context.getRaftPolicy().automaticElectionsEnabled()) {
 
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
 
         if(context.getRaftPolicy().automaticElectionsEnabled()) {
-            if (context.getPeerIds().isEmpty()) {
+            if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
                 actor().tell(ELECTION_TIMEOUT, actor());
             } else {
                 scheduleElection(electionDuration());
                 actor().tell(ELECTION_TIMEOUT, actor());
             } else {
                 scheduleElection(electionDuration());
@@ -346,6 +351,8 @@ public class Follower extends AbstractRaftActorBehavior {
                     logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
                     installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
 
                     logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
                     installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
 
+        leaderId = installSnapshot.getLeaderId();
+
         if(snapshotTracker == null){
             snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
         }
         if(snapshotTracker == null){
             snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
         }
index a2f9e78769420e5675183fa54c1fc6b123b4c21d..f2903983e930490a37478fda36e2f64b2fa70a42 100644 (file)
@@ -60,13 +60,15 @@ public class RaftActorContextImplTest extends AbstractActorTest {
 
         PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
         doReturn("peerAddress2").when(mockResolver).resolve("peer2");
 
         PeerAddressResolver mockResolver = mock(PeerAddressResolver.class);
         doReturn("peerAddress2").when(mockResolver).resolve("peer2");
+        doReturn("peerAddress3").when(mockResolver).resolve("peer3");
         configParams.setPeerAddressResolver(mockResolver);
 
         assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2"));
         configParams.setPeerAddressResolver(mockResolver);
 
         assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2"));
+        assertEquals("getPeerAddress", "peerAddress3", context.getPeerAddress("peer3"));
 
         reset(mockResolver);
 
         reset(mockResolver);
-        assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2"));
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
         assertEquals("getPeerAddress", "peerAddress1", context.getPeerAddress("peer1"));
+        assertEquals("getPeerAddress", "peerAddress2", context.getPeerAddress("peer2"));
         verify(mockResolver, never()).resolve(anyString());
     }
 
         verify(mockResolver, never()).resolve(anyString());
     }
 
index 98a6090514c9549f2f506c82a85fce7376e35cf6..7dcc52028cc28f7b3211565478be382845b3b2f5 100644 (file)
@@ -36,6 +36,7 @@ import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -44,9 +45,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -56,7 +59,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
@@ -134,6 +136,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreSnapshot restoreFromSnapshot;
 
 
     private DatastoreSnapshot restoreFromSnapshot;
 
+    private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+
+    private final String id;
+
     /**
      */
     protected ShardManager(Builder builder) {
     /**
      */
     protected ShardManager(Builder builder) {
@@ -148,6 +154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.primaryShardInfoCache = builder.primaryShardInfoCache;
         this.restoreFromSnapshot = builder.restoreFromSnapshot;
 
         this.primaryShardInfoCache = builder.primaryShardInfoCache;
         this.restoreFromSnapshot = builder.restoreFromSnapshot;
 
+        id = "shard-manager-" + type;
+
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
 
         // Subscribe this actor to cluster member events
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
 
         // Subscribe this actor to cluster member events
@@ -252,47 +260,66 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     private void onCreateShard(CreateShard createShard) {
         Object reply;
         try {
     private void onCreateShard(CreateShard createShard) {
         Object reply;
         try {
-            ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
-            if(localShards.containsKey(moduleShardConfig.getShardName())) {
-                throw new IllegalStateException(String.format("Shard with name %s already exists",
-                        moduleShardConfig.getShardName()));
+            String shardName = createShard.getModuleShardConfig().getShardName();
+            if(localShards.containsKey(shardName)) {
+                reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+            } else {
+                doCreateShard(createShard);
+                reply = new akka.actor.Status.Success(null);
             }
             }
+        } catch (Exception e) {
+            LOG.error("onCreateShard failed", e);
+            reply = new akka.actor.Status.Failure(e);
+        }
 
 
-            configuration.addModuleShardConfiguration(moduleShardConfig);
+        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+            getSender().tell(reply, getSelf());
+        }
+    }
 
 
-            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
-            Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
-                    moduleShardConfig.getShardMemberNames()*/);
+    private void doCreateShard(CreateShard createShard) {
+        ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+        String shardName = moduleShardConfig.getShardName();
 
 
-            LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
-                    moduleShardConfig.getShardMemberNames(), peerAddresses);
+        configuration.addModuleShardConfiguration(moduleShardConfig);
 
 
-            DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
-            if(shardDatastoreContext == null) {
-                shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
-            } else {
-                shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
-                        peerAddressResolver).build();
-            }
+        DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+        if(shardDatastoreContext == null) {
+            shardDatastoreContext = newShardDatastoreContext(shardName);
+        } else {
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+                    peerAddressResolver).build();
+        }
 
 
-            ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
-                    shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
-            localShards.put(info.getShardName(), info);
+        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
 
-            mBean.addLocalShard(shardId.toString());
+        Map<String, String> peerAddresses;
+        boolean isActiveMember;
+        if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+            peerAddresses = getPeerAddresses(shardName);
+            isActiveMember = true;
+        } else {
+            // The local member is not in the given shard member configuration. In this case we'll create
+            // the shard with no peers and with elections disabled so it stays as follower. A
+            // subsequent AddServer request will be needed to make it an active member.
+            isActiveMember = false;
+            peerAddresses = Collections.emptyMap();
+            shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
+                    customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+        }
 
 
-            if(schemaContext != null) {
-                info.setActor(newShardActor(schemaContext, info));
-            }
+        LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
+                moduleShardConfig.getShardMemberNames(), peerAddresses);
 
 
-            reply = new CreateShardReply();
-        } catch (Exception e) {
-            LOG.error("onCreateShard failed", e);
-            reply = new akka.actor.Status.Failure(e);
-        }
+        ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+                shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
+        info.setActiveMember(isActiveMember);
+        localShards.put(info.getShardName(), info);
 
 
-        if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
-            getSender().tell(reply, getSelf());
+        mBean.addLocalShard(shardId.toString());
+
+        if(schemaContext != null) {
+            info.setActor(newShardActor(schemaContext, info));
         }
     }
 
         }
     }
 
@@ -677,7 +704,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
 
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
-        if (info != null) {
+        if (info != null && info.isActiveMember()) {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
@@ -791,7 +818,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public String persistenceId() {
 
     @Override
     public String persistenceId() {
-        return "shard-manager-" + type;
+        return id;
     }
 
     @VisibleForTesting
     }
 
     @VisibleForTesting
@@ -799,22 +826,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
         return mBean;
     }
 
-    private void checkLocalShardExists(final String shardName, final ActorRef sender) {
-        if (localShards.containsKey(shardName)) {
-            String msg = String.format("Local shard %s already exists", shardName);
+    private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
+        if (shardReplicaOperationsInProgress.contains(shardName)) {
+            String msg = String.format("A shard replica operation for %s is already in progress", shardName);
             LOG.debug ("{}: {}", persistenceId(), msg);
             LOG.debug ("{}: {}", persistenceId(), msg);
-            sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            return true;
         }
         }
+
+        return false;
     }
 
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
     }
 
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
         final String shardName = shardReplicaMsg.getShardName();
 
-        // verify the local shard replica is already available in the controller node
         LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
 
         LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
 
-        checkLocalShardExists(shardName, getSender());
-
         // verify the shard with the specified name is present in the cluster configuration
         if (!(this.configuration.isShardConfigured(shardName))) {
             String msg = String.format("No module configuration exists for shard %s", shardName);
         // verify the shard with the specified name is present in the cluster configuration
         if (!(this.configuration.isShardConfigured(shardName))) {
             String msg = String.format("No module configuration exists for shard %s", shardName);
@@ -832,95 +859,114 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
             return;
         }
 
-        Map<String, String> peerAddresses = getPeerAddresses(shardName);
-        if (peerAddresses.isEmpty()) {
-            String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
-            LOG.debug ("{}: {}", persistenceId(), msg);
-            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
-            return;
-        }
-
         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
                 getShardInitializationTimeout().duration().$times(2));
 
         final ActorRef sender = getSender();
         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
                 getShardInitializationTimeout().duration().$times(2));
 
         final ActorRef sender = getSender();
-        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
+        Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
                     LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object response) {
                 if (failure != null) {
                     LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("Failed to find leader for shard %s", shardName), failure)),
-                        getSelf());
+                        String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
                 } else {
                 } else {
-                    if (!(response instanceof RemotePrimaryShardFound)) {
+                    if(response instanceof RemotePrimaryShardFound) {
+                        RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+                        addShard (shardName, message, sender);
+                    } else if(response instanceof LocalPrimaryShardFound) {
+                        sendLocalReplicaAlreadyExistsReply(shardName, sender);
+                    } else {
                         String msg = String.format("Failed to find leader for shard %s: received response: %s",
                                 shardName, response);
                         LOG.debug ("{}: {}", persistenceId(), msg);
                         String msg = String.format("Failed to find leader for shard %s: received response: %s",
                                 shardName, response);
                         LOG.debug ("{}: {}", persistenceId(), msg);
-                        sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
-                        return;
+                        sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
+                            new RuntimeException(msg)), getSelf());
                     }
                     }
-
-                    RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
-                    addShard (shardName, message, sender);
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
                 }
             }
         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
     }
 
+    private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+        String msg = String.format("Local shard %s already exists", shardName);
+        LOG.debug ("{}: {}", persistenceId(), msg);
+        sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+    }
+
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
-        checkLocalShardExists(shardName, sender);
+        if(isShardReplicaOperationInProgress(shardName, sender)) {
+            return;
+        }
 
 
-        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
-        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+        shardReplicaOperationsInProgress.add(shardName);
+
+        final ShardInformation shardInfo;
+        final boolean removeShardOnFailure;
+        ShardInformation existingShardInfo = localShards.get(shardName);
+        if(existingShardInfo == null) {
+            removeShardOnFailure = true;
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+            DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+                    DisableElectionsRaftPolicy.class.getName()).build();
 
 
-        DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
-                DisableElectionsRaftPolicy.class.getName()).build();
+            shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+                    Shard.builder(), peerAddressResolver);
+            shardInfo.setActiveMember(false);
+            localShards.put(shardName, shardInfo);
+            shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+        } else {
+            removeShardOnFailure = false;
+            shardInfo = existingShardInfo;
+        }
 
 
-        final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
-                          getPeerAddresses(shardName), datastoreContext,
-                          Shard.builder(), peerAddressResolver);
-        shardInfo.setShardActiveMember(false);
-        localShards.put(shardName, shardInfo);
-        shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
 
         //inform ShardLeader to add this shard as a replica by sending an AddServer message
         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
 
         //inform ShardLeader to add this shard as a replica by sending an AddServer message
         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
-                response.getPrimaryPath(), shardId);
+                response.getPrimaryPath(), shardInfo.getShardId());
 
 
-        Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
+        Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
+                duration());
         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
-            new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+            new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
 
         futureObj.onComplete(new OnComplete<Object>() {
             @Override
             public void onComplete(Throwable failure, Object addServerResponse) {
+                shardReplicaOperationsInProgress.remove(shardName);
                 if (failure != null) {
                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
                 if (failure != null) {
                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
                             response.getPrimaryPath(), shardName, failure);
 
-                    // Remove the shard
-                    localShards.remove(shardName);
-                    if (shardInfo.getActor() != null) {
-                        shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
-                    }
-
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("AddServer request to leader %s for shard %s failed",
-                            response.getPrimaryPath(), shardName), failure)), getSelf());
+                    onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
                 } else {
                     AddServerReply reply = (AddServerReply)addServerResponse;
                 } else {
                     AddServerReply reply = (AddServerReply)addServerResponse;
-                    onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+                    onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
                 }
             }
                 }
             }
-        }, new Dispatchers(context().system().dispatchers()).
-            getDispatcher(Dispatchers.DispatcherType.Client));
-        return;
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+            boolean removeShardOnFailure) {
+        if(removeShardOnFailure) {
+            ShardInformation shardInfo = localShards.remove(shardName);
+            if (shardInfo.getActor() != null) {
+                shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+            }
+        }
+
+        sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+            new RuntimeException(message, failure)), getSelf());
     }
 
     }
 
-    private void onAddServerReply (String shardName, ShardInformation shardInfo,
-                                   AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+    private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+            String leaderPath, boolean removeShardOnFailure) {
+        String shardName = shardInfo.getShardName();
         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
 
         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
@@ -928,35 +974,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
-            shardInfo.setShardActiveMember(true);
+            shardInfo.setActiveMember(true);
             persistShardList();
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
             persistShardList();
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
-            sender.tell(new akka.actor.Status.Success(true), getSelf());
+            sender.tell(new akka.actor.Status.Success(null), getSelf());
+        } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+            sendLocalReplicaAlreadyExistsReply(shardName, sender);
         } else {
         } else {
-            LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+            LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
                     persistenceId(), shardName, replyMsg.getStatus());
 
                     persistenceId(), shardName, replyMsg.getStatus());
 
-            //remove the local replica created
-            localShards.remove(shardName);
-            if (shardInfo.getActor() != null) {
-                shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
-            }
+            Exception failure;
             switch (replyMsg.getStatus()) {
                 case TIMEOUT:
             switch (replyMsg.getStatus()) {
                 case TIMEOUT:
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
-                        String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
-                            leaderPath, shardName))), getSelf());
+                    failure = new TimeoutException(String.format(
+                            "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+                            "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                            leaderPath, shardName));
                     break;
                 case NO_LEADER:
                     break;
                 case NO_LEADER:
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
-                        "There is no shard leader available for shard %s", shardName))), getSelf());
+                    failure = createNoShardLeaderException(shardInfo.getShardId());
                     break;
                 default :
                     break;
                 default :
-                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
-                        "AddServer request to leader %s for shard %s failed with status %s",
-                        leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+                    failure = new RuntimeException(String.format(
+                            "AddServer request to leader %s for shard %s failed with status %s",
+                            leaderPath, shardName, replyMsg.getStatus()));
             }
             }
+
+            onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
         }
     }
 
         }
     }
 
@@ -976,9 +1022,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void persistShardList() {
     }
 
     private void persistShardList() {
-        List<String> shardList = new ArrayList(localShards.keySet());
+        List<String> shardList = new ArrayList<>(localShards.keySet());
         for (ShardInformation shardInfo : localShards.values()) {
         for (ShardInformation shardInfo : localShards.values()) {
-            if (!shardInfo.isShardActiveMember()) {
+            if (!shardInfo.isActiveMember()) {
                 shardList.remove(shardInfo.getShardName());
             }
         }
                 shardList.remove(shardInfo.getShardName());
             }
         }
@@ -1031,7 +1077,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private DatastoreContext datastoreContext;
         private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
         private DatastoreContext datastoreContext;
         private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
-        private boolean shardActiveStatus = true;
+        private boolean isActiveMember = true;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
@@ -1231,12 +1277,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             this.leaderVersion = leaderVersion;
         }
 
             this.leaderVersion = leaderVersion;
         }
 
-        void setShardActiveMember(boolean flag) {
-            shardActiveStatus = flag;
+        boolean isActiveMember() {
+            return isActiveMember;
         }
 
         }
 
-        boolean isShardActiveMember() {
-            return shardActiveStatus;
+        void setActiveMember(boolean isActiveMember) {
+            this.isActiveMember = isActiveMember;
         }
     }
 
         }
     }
 
index 1f31d122cd271ac4824bb1d490c579233d1d51cd..bd07dc550a9f5397765ba5c9743d524f9ddb299a 100644 (file)
@@ -244,7 +244,12 @@ class EntityOwnershipShard extends Shard {
     protected void onStateChanged() {
         super.onStateChanged();
 
     protected void onStateChanged() {
         super.onStateChanged();
 
-        commitCoordinator.onStateChanged(this, isLeader());
+        boolean isLeader = isLeader();
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());
+        }
+
+        commitCoordinator.onStateChanged(this, isLeader);
     }
 
     @Override
     }
 
     @Override
@@ -5,12 +5,12 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore.messages;
+package org.opendaylight.controller.cluster.datastore.exceptions;
 
 
-/**
- * Reply message for CreateShard.
- *
- * @author Thomas Pantelis
- */
-public class CreateShardReply {
+public class AlreadyExistsException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public AlreadyExistsException(String message) {
+        super(message);
+    }
 }
 }
index b6ea14ff95e75653d6c6d2ae4ce559261a75674f..2072af68d42b30775a68241eba6d05b1a05c2607 100644 (file)
@@ -24,15 +24,18 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.Dispatchers;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.Dispatchers;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
+import akka.serialization.Serialization;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -52,6 +55,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,6 +66,7 @@ import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
@@ -70,7 +75,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -99,6 +103,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
@@ -113,7 +118,7 @@ public class ShardManagerTest extends AbstractActorTest {
     private static int ID_COUNTER = 1;
 
     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
     private static int ID_COUNTER = 1;
 
     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
-    private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
+    private final String shardMgrID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 
     @Mock
     private static CountDownLatch ready;
 
     @Mock
     private static CountDownLatch ready;
@@ -851,24 +856,22 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testOnRecoveryJournalIsCleaned() {
 
     @Test
     public void testOnRecoveryJournalIsCleaned() {
-        InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
+        String persistenceID = "shard-manager-" + shardMrgIDSuffix;
+        InMemoryJournal.addEntry(persistenceID, 1L, new ShardManager.SchemaContextModules(
                 ImmutableSet.of("foo")));
                 ImmutableSet.of("foo")));
-        InMemoryJournal.addEntry(shardMgrID, 2L, new ShardManager.SchemaContextModules(
+        InMemoryJournal.addEntry(persistenceID, 2L, new ShardManager.SchemaContextModules(
                 ImmutableSet.of("bar")));
                 ImmutableSet.of("bar")));
-        InMemoryJournal.addDeleteMessagesCompleteLatch(shardMgrID);
+        InMemoryJournal.addDeleteMessagesCompleteLatch(persistenceID);
 
 
-        new JavaTestKit(getSystem()) {{
-            TestActorRef<TestShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+        TestShardManager shardManager = newTestShardManager();
 
 
-            shardManager.underlyingActor().waitForRecoveryComplete();
-            InMemoryJournal.waitForDeleteMessagesComplete(shardMgrID);
+        InMemoryJournal.waitForDeleteMessagesComplete(persistenceID);
 
 
-            // Journal entries up to the last one should've been deleted
-            Map<Long, Object> journal = InMemoryJournal.get(shardMgrID);
-            synchronized (journal) {
-                assertEquals("Journal size", 0, journal.size());
-            }
-        }};
+        // Journal entries up to the last one should've been deleted
+        Map<Long, Object> journal = InMemoryJournal.get(persistenceID);
+        synchronized (journal) {
+            assertEquals("Journal size", 0, journal.size());
+        }
     }
 
     @Test
     }
 
     @Test
@@ -1056,7 +1059,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
     }
 
     @Test
-    public void testOnReceiveCreateShard() {
+    public void testOnCreateShard() {
         new JavaTestKit(getSystem()) {{
             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
 
         new JavaTestKit(getSystem()) {{
             datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
 
@@ -1074,7 +1077,7 @@ public class ShardManagerTest extends AbstractActorTest {
                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
 
                     "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
             shardManager.tell(new CreateShard(config, shardBuilder, datastoreContext), getRef());
 
-            expectMsgClass(duration("5 seconds"), CreateShardReply.class);
+            expectMsgClass(duration("5 seconds"), Success.class);
 
             shardManager.tell(new FindLocalShard("foo", true), getRef());
 
 
             shardManager.tell(new FindLocalShard("foo", true), getRef());
 
@@ -1090,16 +1093,43 @@ public class ShardManagerTest extends AbstractActorTest {
                     shardBuilder.getId());
             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
 
                     shardBuilder.getId());
             assertSame("schemaContext", schemaContext, shardBuilder.getSchemaContext());
 
-            // Send CreateShard with same name - should fail.
+            // Send CreateShard with same name - should return Success with a message.
+
+            shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+
+            Success success = expectMsgClass(duration("5 seconds"), Success.class);
+            assertNotNull("Success status is null", success.status());
+        }};
+    }
+
+    @Test
+    public void testOnCreateShardWithLocalMemberNotInShardConfig() {
+        new JavaTestKit(getSystem()) {{
+            datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+            ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+                    new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), ActorRef.noSender());
+
+            Shard.Builder shardBuilder = Shard.builder();
+            ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+                    "foo", null, Arrays.asList("member-5", "member-6"));
 
             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
 
             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
+            expectMsgClass(duration("5 seconds"), Success.class);
 
 
-            expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            shardManager.tell(new FindLocalShard("foo", true), getRef());
+            expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+            assertEquals("peerMembers size", 0, shardBuilder.getPeerAddresses().size());
+            assertEquals("schemaContext", DisableElectionsRaftPolicy.class.getName(),
+                    shardBuilder.getDatastoreContext().getShardRaftConfig().getCustomRaftPolicyImplementationClass());
         }};
     }
 
     @Test
         }};
     }
 
     @Test
-    public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
+    public void testOnCreateShardWithNoInitialSchemaContext() {
         new JavaTestKit(getSystem()) {{
             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
         new JavaTestKit(getSystem()) {{
             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
@@ -1110,7 +1140,7 @@ public class ShardManagerTest extends AbstractActorTest {
                     "foo", null, Arrays.asList("member-1"));
             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
 
                     "foo", null, Arrays.asList("member-1"));
             shardManager.tell(new CreateShard(config, shardBuilder, null), getRef());
 
-            expectMsgClass(duration("5 seconds"), CreateShardReply.class);
+            expectMsgClass(duration("5 seconds"), Success.class);
 
             SchemaContext schemaContext = TestModel.createTestContext();
             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
 
             SchemaContext schemaContext = TestModel.createTestContext();
             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
@@ -1165,7 +1195,7 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
     }
 
     @Test
-    public void testAddShardReplicaForNonExistentShard() throws Exception {
+    public void testAddShardReplicaForNonExistentShardConfig() throws Exception {
         new JavaTestKit(getSystem()) {{
             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
         new JavaTestKit(getSystem()) {{
             ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
                     new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
@@ -1178,17 +1208,6 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
         }};
     }
 
-    @Test
-    public void testAddShardReplicaForAlreadyCreatedShard() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
-            shardManager.tell(new AddShardReplica("default"), getRef());
-            Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
-            assertEquals("Failure obtained", true,
-                          (resp.cause() instanceof IllegalArgumentException));
-        }};
-    }
-
     @Test
     public void testAddShardReplica() throws Exception {
         MockConfiguration mockConfig =
     @Test
     public void testAddShardReplica() throws Exception {
         MockConfiguration mockConfig =
@@ -1253,35 +1272,167 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     @Test
     }
 
     @Test
-    public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
-        MockConfiguration mockConfig =
-                new MockConfiguration(ImmutableMap.<String, List<String>>builder().
-                   put("default", Arrays.asList("member-1", "member-2")).
-                   put("astronauts", Arrays.asList("member-2")).build());
+    public void testAddShardReplicaWithPreExistingReplicaInRemoteShardLeader() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newPropsShardMgrWithMockShardActor(), shardMgrID);
 
 
-        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
 
 
-        // Create an ActorSystem ShardManager actor for member-1.
-        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
-        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
-        ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
-        final TestActorRef<ForwardingShardManager> newReplicaShardManager = TestActorRef.create(system1,
-                newPropsShardMgrWithMockShardActor("shardManager1", mockDefaultShardActor,
-                   new ClusterWrapperImpl(system1), mockConfig), shardManagerID);
+            String leaderId = "leader-member-shard-default-" + shardMrgIDSuffix;
+            AddServerReply addServerReply = new AddServerReply(ServerChangeStatus.ALREADY_EXISTS, null);
+            ActorRef leaderShardActor = shardManager.underlyingActor().getContext().actorOf(
+                    Props.create(MockRespondActor.class, addServerReply), leaderId);
 
 
-        new JavaTestKit(system1) {{
+            MockClusterWrapper.sendMemberUp(shardManager, "leader-member", leaderShardActor.path().toString());
+
+            String newReplicaId = "member-1-shard-default-" + shardMrgIDSuffix;
+            shardManager.tell(new RoleChangeNotification(newReplicaId,
+                    RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
+            shardManager.tell(new ShardLeaderStateChanged(newReplicaId, leaderId, Optional.<DataTree>absent(),
+                    DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+            shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+
+            MessageCollectorActor.expectFirstMatching(leaderShardActor, AddServer.class);
+
+            Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+            assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+            expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+            // Send message again to verify previous in progress state is cleared
+
+            shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+            resp = expectMsgClass(duration("5 seconds"), Failure.class);
+            assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+            // Send message again with an AddServer timeout to verify the pre-existing shard actor isn't terminated.
+
+            shardManager.tell(newDatastoreContextFactory(datastoreContextBuilder.
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build()), getRef());
+            leaderShardActor.tell(MockRespondActor.CLEAR_RESPONSE, ActorRef.noSender());
+            shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+            expectMsgClass(duration("5 seconds"), Failure.class);
+
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+            expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+            leaderShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testAddShardReplicaWithPreExistingLocalReplicaLeader() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+            ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+            shardManager.tell(new ActorInitialized(), mockShardActor);
+            shardManager.tell(new ShardLeaderStateChanged(memberId, memberId, Optional.of(mock(DataTree.class)),
+                    DataStoreVersions.CURRENT_VERSION), getRef());
+            shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
+                    RaftState.Leader.name())), mockShardActor);
+
+            shardManager.tell(new AddShardReplica(Shard.DEFAULT_NAME), getRef());
+            Failure resp = expectMsgClass(duration("5 seconds"), Failure.class);
+            assertEquals("Failure cause", AlreadyExistsException.class, resp.cause().getClass());
+
+            shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
+            expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+        }};
+    }
+
+    @Test
+    public void testAddShardReplicaWithAddServerReplyFailure() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                       put("astronauts", Arrays.asList("member-2")).build());
+
+            ActorRef mockNewReplicaShardActor = newMockShardActor(getSystem(), "astronauts", "member-1");
+            TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockNewReplicaShardActor,
+                            new MockClusterWrapper(), mockConfig), shardMgrID);
+            shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            JavaTestKit terminateWatcher = new JavaTestKit(getSystem());
+            terminateWatcher.watch(mockNewReplicaShardActor);
+
+            shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+            AddServer addServerMsg = mockShardLeaderKit.expectMsgClass(AddServer.class);
+            assertEquals("AddServer serverId", "member-1-shard-astronauts-" + shardMrgIDSuffix,
+                    addServerMsg.getNewServerId());
+            mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.TIMEOUT, null));
+
+            Failure failure = expectMsgClass(duration("5 seconds"), Failure.class);
+            assertEquals("Failure cause", TimeoutException.class, failure.cause().getClass());
+
+            shardManager.tell(new FindLocalShard("astronauts", false), getRef());
+            expectMsgClass(duration("5 seconds"), LocalShardNotFound.class);
+
+            terminateWatcher.expectTerminated(mockNewReplicaShardActor);
+
+            shardManager.tell(new AddShardReplica("astronauts"), getRef());
+            mockShardLeaderKit.expectMsgClass(AddServer.class);
+            mockShardLeaderKit.reply(new AddServerReply(ServerChangeStatus.NO_LEADER, null));
+            failure = expectMsgClass(duration("5 seconds"), Failure.class);
+            assertEquals("Failure cause", NoShardLeaderException.class, failure.cause().getClass());
+        }};
+    }
+
+    @Test
+    public void testAddShardReplicaWithAlreadyInProgress() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            JavaTestKit mockShardLeaderKit = new JavaTestKit(getSystem());
+            JavaTestKit secondRequestKit = new JavaTestKit(getSystem());
+
+            MockConfiguration mockConfig =
+                    new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                       put("astronauts", Arrays.asList("member-2")).build());
+
+            TestActorRef<ForwardingShardManager> shardManager = TestActorRef.create(getSystem(),
+                    newPropsShardMgrWithMockShardActor("newReplicaShardManager", mockShardActor,
+                            new MockClusterWrapper(), mockConfig), shardMgrID);
+            shardManager.underlyingActor().setMessageInterceptor(newFindPrimaryInterceptor(mockShardLeaderKit.getRef()));
+
+            shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
+
+            shardManager.tell(new AddShardReplica("astronauts"), getRef());
+
+            mockShardLeaderKit.expectMsgClass(AddServer.class);
+
+            shardManager.tell(new AddShardReplica("astronauts"), secondRequestKit.getRef());
+
+            secondRequestKit.expectMsgClass(duration("5 seconds"), Failure.class);
+        }};
+    }
+
+    @Test
+    public void testAddShardReplicaWithFindPrimaryTimeout() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+                       put("astronauts", Arrays.asList("member-2")).build());
+
+            ActorRef newReplicaShardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor(
+                    "shardManager", mockShardActor, new MockClusterWrapper(), mockConfig));
 
             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
 
             newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
             MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2", getRef().path().toString());
-            newReplicaShardManager.underlyingActor().waitForMemberUp();
 
             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
             assertEquals("Failure obtained", true,
                           (resp.cause() instanceof RuntimeException));
         }};
 
             newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
             Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
             assertEquals("Failure obtained", true,
                           (resp.cause() instanceof RuntimeException));
         }};
-
-        JavaTestKit.shutdownActorSystem(system1);
     }
 
     @Test
     }
 
     @Test
@@ -1308,7 +1459,7 @@ public class ShardManagerTest extends AbstractActorTest {
                    put("people", Arrays.asList("member-1", "member-2")).build());
             String[] restoredShards = {"default", "astronauts"};
             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
                    put("people", Arrays.asList("member-1", "member-2")).build());
             String[] restoredShards = {"default", "astronauts"};
             ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
-            InMemorySnapshotStore.addSnapshot(shardMgrID, snapshot);
+            InMemorySnapshotStore.addSnapshot("shard-manager-" + shardMrgIDSuffix, snapshot);
 
             //create shardManager to come up with restored data
             TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
 
             //create shardManager to come up with restored data
             TestActorRef<TestShardManager> newRestoredShardManager = TestActorRef.create(getSystem(),
@@ -1388,6 +1539,24 @@ public class ShardManagerTest extends AbstractActorTest {
         }
     }
 
         }
     }
 
+    interface MessageInterceptor extends Function<Object, Object> {
+        boolean canIntercept(Object message);
+    }
+
+    private MessageInterceptor newFindPrimaryInterceptor(final ActorRef primaryActor) {
+        return new MessageInterceptor(){
+            @Override
+            public Object apply(Object message) {
+                return new RemotePrimaryShardFound(Serialization.serializedActorPath(primaryActor), (short) 1);
+            }
+
+            @Override
+            public boolean canIntercept(Object message) {
+                return message instanceof FindPrimary;
+            }
+        };
+    }
+
     private static class ForwardingShardManager extends ShardManager {
         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
         private CountDownLatch memberUpReceived = new CountDownLatch(1);
     private static class ForwardingShardManager extends ShardManager {
         private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
         private CountDownLatch memberUpReceived = new CountDownLatch(1);
@@ -1398,6 +1567,7 @@ public class ShardManagerTest extends AbstractActorTest {
         private final String name;
         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
         private ShardManagerSnapshot snapshot;
         private final String name;
         private final CountDownLatch snapshotPersist = new CountDownLatch(1);
         private ShardManagerSnapshot snapshot;
+        private volatile MessageInterceptor messageInterceptor;
 
         public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
             super(builder);
 
         public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
             super(builder);
@@ -1405,10 +1575,19 @@ public class ShardManagerTest extends AbstractActorTest {
             this.name = name;
         }
 
             this.name = name;
         }
 
+        void setMessageInterceptor(MessageInterceptor messageInterceptor) {
+            this.messageInterceptor = messageInterceptor;
+        }
+
+
         @Override
         public void handleCommand(Object message) throws Exception {
             try{
         @Override
         public void handleCommand(Object message) throws Exception {
             try{
-                super.handleCommand(message);
+                if(messageInterceptor != null && messageInterceptor.canIntercept(message)) {
+                    getSender().tell(messageInterceptor.apply(message), getSelf());
+                } else {
+                    super.handleCommand(message);
+                }
             } finally {
                 if(message instanceof FindPrimary) {
                     findPrimaryMessageReceived.countDown();
             } finally {
                 if(message instanceof FindPrimary) {
                     findPrimaryMessageReceived.countDown();
@@ -1491,9 +1670,19 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private static class MockRespondActor extends MessageCollectorActor {
     }
 
     private static class MockRespondActor extends MessageCollectorActor {
+        static final String CLEAR_RESPONSE = "clear-response";
 
         private volatile Object responseMsg;
 
 
         private volatile Object responseMsg;
 
+        @SuppressWarnings("unused")
+        public MockRespondActor() {
+        }
+
+        @SuppressWarnings("unused")
+        public MockRespondActor(Object responseMsg) {
+            this.responseMsg = responseMsg;
+        }
+
         public void updateResponse(Object response) {
             responseMsg = response;
         }
         public void updateResponse(Object response) {
             responseMsg = response;
         }
@@ -1504,8 +1693,9 @@ public class ShardManagerTest extends AbstractActorTest {
             if (message instanceof AddServer) {
                 if (responseMsg != null) {
                     getSender().tell(responseMsg, getSelf());
             if (message instanceof AddServer) {
                 if (responseMsg != null) {
                     getSender().tell(responseMsg, getSelf());
-                    responseMsg = null;
                 }
                 }
+            } if(message.equals(CLEAR_RESPONSE)) {
+                responseMsg = null;
             }
         }
     }
             }
         }
     }
index 2ca128271e0497d17446e742d6b847af905d598f..2167bd275d237e7125296e66ef1f9ff749725a11 100644 (file)
@@ -17,11 +17,14 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange;
+import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.AddressFromURIString;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
 import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Function;
 import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Function;
@@ -44,6 +47,7 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
@@ -66,6 +70,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class DistributedEntityOwnershipIntegrationTest {
     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
     private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
 public class DistributedEntityOwnershipIntegrationTest {
     private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
     private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
+    private static final String MODULE_SHARDS_MEMBER_1_CONFIG = "module-shards-default-member-1.conf";
     private static final String ENTITY_TYPE1 = "entityType1";
     private static final String ENTITY_TYPE2 = "entityType2";
     private static final Entity ENTITY1 = new Entity(ENTITY_TYPE1, "entity1");
     private static final String ENTITY_TYPE1 = "entityType1";
     private static final String ENTITY_TYPE2 = "entityType2";
     private static final Entity ENTITY1 = new Entity(ENTITY_TYPE1, "entity1");
@@ -110,32 +115,48 @@ public class DistributedEntityOwnershipIntegrationTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
+    }
 
 
-        leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
-        Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
+    @After
+    public void tearDown() {
+        if(leaderSystem != null) {
+            JavaTestKit.shutdownActorSystem(leaderSystem);
+        }
 
 
-        follower1System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
-        Cluster.get(follower1System).join(MEMBER_1_ADDRESS);
+        if(follower1System != null) {
+            JavaTestKit.shutdownActorSystem(follower1System);
+        }
+
+        if(follower2System != null) {
+            JavaTestKit.shutdownActorSystem(follower2System);
+        }
+    }
 
 
+    private void startAllSystems() {
+        startLeaderSystem();
+        startFollower1System();
+        startFollower2System();
+    }
+
+    private void startFollower2System() {
         follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
         Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
     }
 
         follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
         Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
     }
 
-    @After
-    public void tearDown() {
-        JavaTestKit.shutdownActorSystem(leaderSystem);
-        JavaTestKit.shutdownActorSystem(follower1System);
-        JavaTestKit.shutdownActorSystem(follower2System);
+    private void startFollower1System() {
+        follower1System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+        Cluster.get(follower1System).join(MEMBER_1_ADDRESS);
+    }
+
+    private void startLeaderSystem() {
+        leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
     }
 
     private void initDatastores(String type) {
     }
 
     private void initDatastores(String type) {
-        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
-        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(
-                type, MODULE_SHARDS_CONFIG, false, SCHEMA_CONTEXT);
+        initLeaderDatastore(type, MODULE_SHARDS_CONFIG);
 
 
-        follower1TestKit = new IntegrationTestKit(follower1System, followerDatastoreContextBuilder);
-        follower1DistributedDataStore = follower1TestKit.setupDistributedDataStore(
-                type, MODULE_SHARDS_CONFIG, false, SCHEMA_CONTEXT);
+        initFollower1Datastore(type, MODULE_SHARDS_CONFIG);
 
         follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
         follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(
 
         follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
         follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(
@@ -145,21 +166,44 @@ public class DistributedEntityOwnershipIntegrationTest {
         follower1DistributedDataStore.waitTillReady();
         follower2DistributedDataStore.waitTillReady();
 
         follower1DistributedDataStore.waitTillReady();
         follower2DistributedDataStore.waitTillReady();
 
-        leaderEntityOwnershipService = new DistributedEntityOwnershipService(leaderDistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build());
-        leaderEntityOwnershipService.start();
+        startLeaderService();
 
 
-        follower1EntityOwnershipService = new DistributedEntityOwnershipService(follower1DistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build());
-        follower1EntityOwnershipService.start();
+        startFollower1Service();
 
 
-        follower2EntityOwnershipService = new DistributedEntityOwnershipService(follower2DistributedDataStore, EntityOwnerSelectionStrategyConfig.newBuilder().build());
+        follower2EntityOwnershipService = new DistributedEntityOwnershipService(follower2DistributedDataStore,
+                EntityOwnerSelectionStrategyConfig.newBuilder().build());
         follower2EntityOwnershipService.start();
 
         follower2EntityOwnershipService.start();
 
-        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
-                DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+    }
+
+    private void startFollower1Service() {
+        follower1EntityOwnershipService = new DistributedEntityOwnershipService(follower1DistributedDataStore,
+                EntityOwnerSelectionStrategyConfig.newBuilder().build());
+        follower1EntityOwnershipService.start();
+    }
+
+    private void startLeaderService() {
+        leaderEntityOwnershipService = new DistributedEntityOwnershipService(leaderDistributedDataStore,
+                EntityOwnerSelectionStrategyConfig.newBuilder().build());
+        leaderEntityOwnershipService.start();
+    }
+
+    private void initFollower1Datastore(String type, String moduleConfig) {
+        follower1TestKit = new IntegrationTestKit(follower1System, followerDatastoreContextBuilder);
+        follower1DistributedDataStore = follower1TestKit.setupDistributedDataStore(
+                type, moduleConfig, false, SCHEMA_CONTEXT);
+    }
+
+    private void initLeaderDatastore(String type, String moduleConfig) {
+        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(
+                type, moduleConfig, false, SCHEMA_CONTEXT);
     }
 
     @Test
     public void test() throws Exception {
     }
 
     @Test
     public void test() throws Exception {
+        startAllSystems();
         initDatastores("test");
 
         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
         initDatastores("test");
 
         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
@@ -275,6 +319,7 @@ public class DistributedEntityOwnershipIntegrationTest {
      */
     @Test
     public void testCloseCandidateRegistrationInQuickSuccession() throws CandidateAlreadyRegisteredException {
      */
     @Test
     public void testCloseCandidateRegistrationInQuickSuccession() throws CandidateAlreadyRegisteredException {
+        startAllSystems();
         initDatastores("testCloseCandidateRegistrationInQuickSuccession");
 
         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
         initDatastores("testCloseCandidateRegistrationInQuickSuccession");
 
         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
@@ -318,6 +363,46 @@ public class DistributedEntityOwnershipIntegrationTest {
         assertFalse(follower2ChangeCaptor.getAllValues().get(follower2ChangeCaptor.getAllValues().size()-1).hasOwner());
     }
 
         assertFalse(follower2ChangeCaptor.getAllValues().get(follower2ChangeCaptor.getAllValues().size()-1).hasOwner());
     }
 
+    /**
+     * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local
+     * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring
+     * an AddShardReplica request to join it to an existing leader.
+     */
+    @Test
+    public void testEntityOwnershipShardBootstrapping() throws Throwable {
+        startLeaderSystem();
+        startFollower1System();
+        String type = "testEntityOwnershipShardBootstrapping";
+        initLeaderDatastore(type, MODULE_SHARDS_MEMBER_1_CONFIG);
+        initFollower1Datastore(type, MODULE_SHARDS_MEMBER_1_CONFIG);
+
+        leaderDistributedDataStore.waitTillReady();
+        follower1DistributedDataStore.waitTillReady();
+
+        startLeaderService();
+        startFollower1Service();
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+        leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
+
+        // Register a candidate for follower1 - should get queued since follower1 has no leader
+        follower1EntityOwnershipService.registerCandidate(ENTITY1);
+        verify(leaderMockListener, timeout(300).never()).ownershipChanged(ownershipChange(ENTITY1));
+
+        // Add replica in follower1
+        AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME);
+        follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1TestKit.getRef());
+        Object reply = follower1TestKit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class);
+        if(reply instanceof Failure) {
+            throw ((Failure)reply).cause();
+        }
+
+        // The queued candidate registration should proceed
+        verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1));
+
+    }
+
     private static void verifyGetOwnershipState(DistributedEntityOwnershipService service, Entity entity,
             boolean isOwner, boolean hasOwner) {
         Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
     private static void verifyGetOwnershipState(DistributedEntityOwnershipService service, Entity entity,
             boolean isOwner, boolean hasOwner) {
         Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
index 75fa9875447b131c8a9cb0d1d947e83778bd193a..100e6dec01250e510b1aa4ac24821fd43243a37c 100644 (file)
@@ -26,7 +26,9 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import akka.actor.Props;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -86,12 +88,18 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh
         DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
                 shardInitializationTimeout(10, TimeUnit.SECONDS).build();
 
         DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
                 shardInitializationTimeout(10, TimeUnit.SECONDS).build();
 
-        Configuration configuration = new ConfigurationImpl(new ModuleShardConfigProvider() {
+        ModuleShardConfigProvider configProvider = new ModuleShardConfigProvider() {
             @Override
             public Map<String, ModuleConfig> retrieveModuleConfigs(Configuration configuration) {
                 return Collections.emptyMap();
             }
             @Override
             public Map<String, ModuleConfig> retrieveModuleConfigs(Configuration configuration) {
                 return Collections.emptyMap();
             }
-        });
+        };
+        Configuration configuration = new ConfigurationImpl(configProvider) {
+            @Override
+            public Collection<String> getUniqueMemberNamesForAllShards() {
+                return Sets.newHashSet("member-1");
+            }
+        };
 
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
 
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-member-1.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-default-member-1.conf
new file mode 100644 (file)
index 0000000..627d232
--- /dev/null
@@ -0,0 +1,13 @@
+module-shards = [
+    {
+        name = "default"
+        shards = [
+            {
+                name="default",
+                replicas = [
+                    "member-1"
+                ]
+            }
+        ]
+    }
+]