+ @Override
+ public Future<RpcResult<Void>> addPrefixShardReplica(final AddPrefixShardReplicaInput input) {
+
+ final InstanceIdentifier<?> identifier = input.getShardPrefix();
+ if (identifier == null) {
+ return newFailedRpcResultFuture("A valid shard identifier must be specified");
+ }
+
+ final DataStoreType dataStoreType = input.getDataStoreType();
+ if (dataStoreType == null) {
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+ }
+
+ LOG.info("Adding replica for shard {}, datastore type {}", identifier, dataStoreType);
+
+ final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+ final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, new AddPrefixShardReplica(prefix));
+ Futures.addCallback(future, new FutureCallback<Success>() {
+ @Override
+ public void onSuccess(Success success) {
+ LOG.info("Successfully added replica for shard {}", prefix);
+ returnFuture.set(newSuccessfulResult());
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ onMessageFailure(String.format("Failed to add replica for shard %s", prefix),
+ returnFuture, failure);
+ }
+ });
+
+ return returnFuture;
+ }
+
+ @Override
+ public Future<RpcResult<Void>> removePrefixShardReplica(final RemovePrefixShardReplicaInput input) {
+
+ final InstanceIdentifier<?> identifier = input.getShardPrefix();
+ if (identifier == null) {
+ return newFailedRpcResultFuture("A valid shard identifier must be specified");
+ }
+
+ final DataStoreType dataStoreType = input.getDataStoreType();
+ if (dataStoreType == null) {
+ return newFailedRpcResultFuture("A valid DataStoreType must be specified");
+ }
+
+ final String memberName = input.getMemberName();
+ if (Strings.isNullOrEmpty(memberName)) {
+ return newFailedRpcResultFuture("A valid member name must be specified");
+ }
+
+ LOG.info("Removing replica for shard {} memberName {}, datastoreType {}",
+ identifier, memberName, dataStoreType);
+ final YangInstanceIdentifier prefix = serializer.toYangInstanceIdentifier(identifier);
+
+ final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ final ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
+ new RemovePrefixShardReplica(prefix, MemberName.forName(memberName)));
+ Futures.addCallback(future, new FutureCallback<Success>() {
+ @Override
+ public void onSuccess(final Success success) {
+ LOG.info("Successfully removed replica for shard {}", prefix);
+ returnFuture.set(newSuccessfulResult());
+ }
+
+ @Override
+ public void onFailure(final Throwable failure) {
+ onMessageFailure(String.format("Failed to remove replica for shard %s", prefix),
+ returnFuture, failure);
+ }
+ });
+
+ return returnFuture;
+ }
+