Bug 7806 - Implement agent RPCs for shard replica manipulation testing
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
index d51753c3eac0ac16225bed70909afb3c536c9e9d..3627bd80facea719dddddcd61065218efff1d2d7 100644 (file)
@@ -33,14 +33,18 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
@@ -56,13 +60,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,11 +85,14 @@ public class ClusterAdminRpcService implements ClusterAdminService {
 
     private final DistributedDataStoreInterface configDataStore;
     private final DistributedDataStoreInterface operDataStore;
+    private final BindingNormalizedNodeSerializer serializer;
 
     public ClusterAdminRpcService(DistributedDataStoreInterface configDataStore,
-            DistributedDataStoreInterface operDataStore) {
+            DistributedDataStoreInterface operDataStore,
+            BindingNormalizedNodeSerializer serializer) {
         this.configDataStore = configDataStore;
         this.operDataStore = operDataStore;
+        this.serializer = serializer;
     }
 
     @Override
@@ -157,6 +167,83 @@ public class ClusterAdminRpcService implements ClusterAdminService {
         return returnFuture;
     }
 
+    @Override
+    public Future<RpcResult<Void>> addPrefixShardReplica(final AddPrefixShardReplicaInput input) {
+
+        final InstanceIdentifier<?> identifier = input.getShardPrefix();
+        if (identifier == null) {
+            return newFailedRpcResultFuture("A valid shard identifier must be specified");
+        }
+
+        final DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType);
+
+        final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix));
+        Futures.addCallback(future, new FutureCallback<Success>() {
+            @Override
+            public void onSuccess(Success success) {
+                LOG.info("Successfully added replica for shard {}", prefix);
+                returnFuture.set(newSuccessfulResult());
+            }
+
+            @Override
+            public void onFailure(Throwable failure) {
+                onMessageFailure(String.format("Failed to add replica for shard %s", prefix),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> removePrefixShardReplica(final RemovePrefixShardReplicaInput input) {
+
+        final InstanceIdentifier<?> identifier = input.getShardPrefix();
+        if (identifier == null) {
+            return newFailedRpcResultFuture("A valid shard identifier must be specified");
+        }
+
+        final DataStoreType dataStoreType = input.getDataStoreType();
+        if (dataStoreType == null) {
+            return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+        }
+
+        final String memberName = input.getMemberName();
+        if (Strings.isNullOrEmpty(memberName)) {
+            return newFailedRpcResultFuture("A valid member name must be specified");
+        }
+
+        LOG.info("Removing replica for shard {} memberName {}, datastoreType {}",
+                identifier, memberName, dataStoreType);
+        final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        final ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
+                new RemovePrefixShardReplica(prefix, MemberName.forName(memberName)));
+        Futures.addCallback(future, new FutureCallback<Success>() {
+            @Override
+            public void onSuccess(final Success success) {
+                LOG.info("Successfully removed replica for shard {}", prefix);
+                returnFuture.set(newSuccessfulResult());
+            }
+
+            @Override
+            public void onFailure(final Throwable failure) {
+                onMessageFailure(String.format("Failed to remove replica for shard %s", prefix),
+                        returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
+    }
+
     @Override
     public Future<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards() {
         LOG.info("Adding replicas for all shards");