BUG 2187 Implement Remove Shard Replica RPC 99/30999/3
authorMoiz Raja <moraja@cisco.com>
Tue, 8 Dec 2015 17:23:24 +0000 (09:23 -0800)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 15 Dec 2015 11:59:28 +0000 (06:59 -0500)
Change-Id: I8ccdf81903c505ecf77ed9f91b6021aae9fe8d0d
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java

index 3e14c58..8532b95 100644 (file)
@@ -1154,6 +1154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+        LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
+
         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
             @Override
index 976443f..0c0f42a 100644 (file)
@@ -32,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
@@ -120,9 +121,41 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
 
     @Override
     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
+        final String shardName = input.getShardName();
+        if(Strings.isNullOrEmpty(shardName)) {
+            return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
+        }
+
+        DataStoreType dataStoreType = input.getDataStoreType();
+        if(dataStoreType == null) {
+            return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
+        }
+
+        final String memberName = input.getMemberName();
+        if(Strings.isNullOrEmpty(memberName)) {
+            return newFailedRpcResultBuilder("A valid member name must be specified").buildFuture();
+        }
+
+        LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType);
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
+                new RemoveShardReplica(shardName, memberName));
+        Futures.addCallback(future, new FutureCallback<Success>() {
+            @Override
+            public void onSuccess(Success success) {
+                LOG.info("Successfully removed replica for shard {}", shardName);
+                returnFuture.set(newSuccessfulResult());
+            }
+
+            @Override
+            public void onFailure(Throwable failure) {
+                onMessageFailure(String.format("Failed to remove replica for shard %s", shardName),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
     }
 
     @Override
index e966c95..d5d4ad1 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
@@ -17,6 +17,7 @@ import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.MemberStatus;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
@@ -144,13 +145,25 @@ public class MemberNode {
         verifyRaftState(datastore, shardName, new RaftStateVerifier() {
             @Override
             public void verify(OnDemandRaftState raftState) {
-                assertTrue(String.format("Peer(s) %s not found for shard %s. Actual: %s", peerIds, shardName,
-                        raftState.getPeerAddresses().keySet()),
-                        raftState.getPeerAddresses().keySet().containsAll(peerIds));
+                assertEquals("Peers for shard " + shardName, peerIds, raftState.getPeerAddresses().keySet());
             }
         });
     }
 
+    public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
+            if(!shardReply.isPresent()) {
+                return;
+            }
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        fail("Shard " + shardName + " is present");
+    }
+
     public static class Builder {
         private final List<MemberNode> members;
         private String moduleShardsConfig;
index e117dcc..80aafa2 100644 (file)
@@ -11,6 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyNoShardPresent;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
 import akka.actor.ActorRef;
@@ -36,6 +37,7 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.MemberNode;
 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
@@ -53,6 +55,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.add.replicas._for.all.shards.output.ShardResultBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -293,8 +296,63 @@ public class ClusterAdminRpcServiceTest {
     }
 
     @Test
-    public void testRemoveShardReplica() {
-        // TODO implement
+    public void testRemoveShardReplica() throws Exception {
+        String name = "testRemoveShardReplicaLocal";
+        String moduleShardsConfig = "module-shards-member1-and-2-and-3.conf";
+        MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).
+                datastoreContextBuilder(DatastoreContext.newBuilder().
+                        shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(1)).build();
+
+        MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        leaderNode1.configDataStore().waitTillReady();
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2", "member-3");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1", "member-3");
+        verifyRaftPeersPresent(replicaNode3.configDataStore(), "cars", "member-1", "member-2");
+
+        // Invoke RPC service on member-3 to remove it's local shard
+
+        ClusterAdminRpcService service3 = new ClusterAdminRpcService(replicaNode3.configDataStore(),
+                replicaNode3.operDataStore());
+
+        RpcResult<Void> rpcResult = service3.removeShardReplica(new RemoveShardReplicaInputBuilder().
+                setShardName("cars").setMemberName("member-3").setDataStoreType(DataStoreType.Config).build()).
+                        get(10, TimeUnit.SECONDS);
+        verifySuccessfulRpcResult(rpcResult);
+        service3.close();
+
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars", "member-2");
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
+        verifyNoShardPresent(replicaNode3.configDataStore(), "cars");
+
+        // Restart member-2 and verify member-3 isn't present.
+
+        Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(replicaNode2.kit().getSystem()).selfAddress());
+        replicaNode2.cleanup();
+
+        replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+                moduleShardsConfig(moduleShardsConfig).build();
+
+        verifyRaftPeersPresent(replicaNode2.configDataStore(), "cars", "member-1");
+
+        // Invoke RPC service on member-1 to remove member-2
+
+        ClusterAdminRpcService service1 = new ClusterAdminRpcService(leaderNode1.configDataStore(),
+                leaderNode1.operDataStore());
+
+        rpcResult = service1.removeShardReplica(new RemoveShardReplicaInputBuilder().
+                setShardName("cars").setMemberName("member-2").setDataStoreType(DataStoreType.Config).build()).
+                        get(10, TimeUnit.SECONDS);
+        verifySuccessfulRpcResult(rpcResult);
+        service1.close();
+
+        verifyRaftPeersPresent(leaderNode1.configDataStore(), "cars");
+        verifyNoShardPresent(replicaNode2.configDataStore(), "cars");
     }
 
     @Test