atomic-storage: remove type dependency at segment level I/O
[controller.git] / opendaylight / md-sal / sal-cluster-admin-impl / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Status.Success;
13 import akka.dispatch.OnComplete;
14 import akka.pattern.Patterns;
15 import akka.util.Timeout;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.base.Strings;
18 import com.google.common.base.Throwables;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.Maps;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.SettableFuture;
26 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.util.AbstractMap.SimpleEntry;
30 import java.util.ArrayList;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Set;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Function;
39 import java.util.stream.Collectors;
40 import org.apache.commons.lang3.SerializationUtils;
41 import org.eclipse.jdt.annotation.NonNull;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
44 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
45 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
46 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
47 import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
48 import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
50 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
51 import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
52 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
53 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
54 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
55 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
56 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
57 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
58 import org.opendaylight.controller.eos.akka.DataCenterControl;
59 import org.opendaylight.mdsal.binding.api.RpcProviderService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenter;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ActivateEosDatacenterOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShards;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastore;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutput;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreOutputBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShards;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsInput;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutput;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForAllShardsOutputBuilder;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShard;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardInput;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutput;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ChangeMemberVotingStatesForShardOutputBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenter;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterInput;
85 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DeactivateEosDatacenterOutput;
86 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShards;
87 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput;
88 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
89 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
90 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShards;
91 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsInput;
92 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutput;
93 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutputBuilder;
94 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
95 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
96 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder;
97 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShard;
98 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardInput;
99 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutput;
100 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutputBuilder;
101 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
102 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
103 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutputBuilder;
104 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicas;
105 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInput;
106 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
107 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutputBuilder;
108 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
109 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClients;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
118 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
119 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
120 import org.opendaylight.yangtools.concepts.Registration;
121 import org.opendaylight.yangtools.yang.common.Empty;
122 import org.opendaylight.yangtools.yang.common.ErrorType;
123 import org.opendaylight.yangtools.yang.common.RpcResult;
124 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
125 import org.opendaylight.yangtools.yang.common.Uint32;
126 import org.slf4j.Logger;
127 import org.slf4j.LoggerFactory;
128 import scala.concurrent.Future;
129
130 /**
131  * Implements the yang RPCs defined in the generated ClusterAdminService interface.
132  *
133  * @author Thomas Pantelis
134  */
135 public final class ClusterAdminRpcService {
136     private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
137
138     private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
139     private static final @NonNull RpcResult<LocateShardOutput> LOCAL_SHARD_RESULT =
140             RpcResultBuilder.success(new LocateShardOutputBuilder()
141                 .setMemberNode(new LocalBuilder().setLocal(Empty.value()).build())
142                 .build())
143             .build();
144
145     private final DistributedDataStoreInterface configDataStore;
146     private final DistributedDataStoreInterface operDataStore;
147     private final Timeout makeLeaderLocalTimeout;
148     private final DataCenterControl dataCenterControl;
149
150     public ClusterAdminRpcService(final DistributedDataStoreInterface configDataStore,
151                                   final DistributedDataStoreInterface operDataStore,
152                                   final DataCenterControl dataCenterControl) {
153         this.configDataStore = configDataStore;
154         this.operDataStore = operDataStore;
155
156         makeLeaderLocalTimeout =
157                 new Timeout(configDataStore.getActorUtils().getDatastoreContext()
158                         .getShardLeaderElectionTimeout().duration().$times(2));
159
160         this.dataCenterControl = dataCenterControl;
161     }
162
163     Registration registerWith(final RpcProviderService rpcProviderService) {
164         return rpcProviderService.registerRpcImplementations(
165             (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013
166                 .AddShardReplica) this::addShardReplica,
167             (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013
168                 .RemoveShardReplica) this::removeShardReplica,
169             (LocateShard) this::locateShard,
170             (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013
171                 .MakeLeaderLocal) this::makeLeaderLocal,
172             (AddReplicasForAllShards) this::addReplicasForAllShards,
173             (RemoveAllShardReplicas) this::removeAllShardReplicas,
174             (ChangeMemberVotingStatesForShard) this::changeMemberVotingStatesForShard,
175             (ChangeMemberVotingStatesForAllShards) this::changeMemberVotingStatesForAllShards,
176             (FlipMemberVotingStatesForAllShards) this::flipMemberVotingStatesForAllShards,
177             (org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013
178                 .GetShardRole) this::getShardRole,
179             (BackupDatastore) this::backupDatastore,
180             (GetKnownClientsForAllShards) this::getKnownClientsForAllShards,
181             (ActivateEosDatacenter) this::activateEosDatacenter,
182             (DeactivateEosDatacenter) this::deactivateEosDatacenter);
183     }
184
185     @VisibleForTesting
186     ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
187         final String shardName = input.getShardName();
188         if (Strings.isNullOrEmpty(shardName)) {
189             return newFailedRpcResultFuture("A valid shard name must be specified");
190         }
191
192         DataStoreType dataStoreType = input.getDataStoreType();
193         if (dataStoreType == null) {
194             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
195         }
196
197         LOG.info("Adding replica for shard {}", shardName);
198
199         final var returnFuture = SettableFuture.<RpcResult<AddShardReplicaOutput>>create();
200         Futures.addCallback(sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName)),
201             new FutureCallback<Success>() {
202                 @Override
203                 public void onSuccess(final Success success) {
204                     LOG.info("Successfully added replica for shard {}", shardName);
205                     returnFuture.set(newSuccessfulResult(new AddShardReplicaOutputBuilder().build()));
206                 }
207
208                 @Override
209                 public void onFailure(final Throwable failure) {
210                     onMessageFailure(String.format("Failed to add replica for shard %s", shardName),
211                         returnFuture, failure);
212                 }
213             }, MoreExecutors.directExecutor());
214
215         return returnFuture;
216     }
217
218     @VisibleForTesting
219     ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(final RemoveShardReplicaInput input) {
220         final String shardName = input.getShardName();
221         if (Strings.isNullOrEmpty(shardName)) {
222             return newFailedRpcResultFuture("A valid shard name must be specified");
223         }
224
225         DataStoreType dataStoreType = input.getDataStoreType();
226         if (dataStoreType == null) {
227             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
228         }
229
230         final String memberName = input.getMemberName();
231         if (Strings.isNullOrEmpty(memberName)) {
232             return newFailedRpcResultFuture("A valid member name must be specified");
233         }
234
235         LOG.info("Removing replica for shard {} memberName {}, datastoreType {}", shardName, memberName, dataStoreType);
236
237         final SettableFuture<RpcResult<RemoveShardReplicaOutput>> returnFuture = SettableFuture.create();
238         ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType,
239                 new RemoveShardReplica(shardName, MemberName.forName(memberName)));
240         Futures.addCallback(future, new FutureCallback<Success>() {
241             @Override
242             public void onSuccess(final Success success) {
243                 LOG.info("Successfully removed replica for shard {}", shardName);
244                 returnFuture.set(newSuccessfulResult(new RemoveShardReplicaOutputBuilder().build()));
245             }
246
247             @Override
248             public void onFailure(final Throwable failure) {
249                 onMessageFailure(String.format("Failed to remove replica for shard %s", shardName),
250                         returnFuture, failure);
251             }
252         }, MoreExecutors.directExecutor());
253
254         return returnFuture;
255     }
256
257     private ListenableFuture<RpcResult<LocateShardOutput>> locateShard(final LocateShardInput input) {
258         final ActorUtils utils;
259         switch (input.getDataStoreType()) {
260             case Config:
261                 utils = configDataStore.getActorUtils();
262                 break;
263             case Operational:
264                 utils = operDataStore.getActorUtils();
265                 break;
266             default:
267                 return newFailedRpcResultFuture("Unhandled datastore in " + input);
268         }
269
270         final SettableFuture<RpcResult<LocateShardOutput>> ret = SettableFuture.create();
271         utils.findPrimaryShardAsync(input.getShardName()).onComplete(new OnComplete<PrimaryShardInfo>() {
272             @Override
273             public void onComplete(final Throwable failure, final PrimaryShardInfo success) throws Throwable {
274                 if (failure != null) {
275                     LOG.debug("Failed to find shard for {}", input, failure);
276                     ret.setException(failure);
277                     return;
278                 }
279
280                 // Data tree implies local leak
281                 if (success.getLocalShardDataTree().isPresent()) {
282                     ret.set(LOCAL_SHARD_RESULT);
283                     return;
284                 }
285
286                 final ActorSelection actorPath = success.getPrimaryShardActor();
287                 ret.set(newSuccessfulResult(new LocateShardOutputBuilder()
288                     .setMemberNode(new LeaderActorRefBuilder()
289                         .setLeaderActorRef(actorPath.toSerializationFormat())
290                         .build())
291                     .build()));
292             }
293         }, utils.getClientDispatcher());
294
295         return ret;
296     }
297
298     @VisibleForTesting
299     ListenableFuture<RpcResult<MakeLeaderLocalOutput>> makeLeaderLocal(final MakeLeaderLocalInput input) {
300         final String shardName = input.getShardName();
301         if (Strings.isNullOrEmpty(shardName)) {
302             return newFailedRpcResultFuture("A valid shard name must be specified");
303         }
304
305         DataStoreType dataStoreType = input.getDataStoreType();
306         if (dataStoreType == null) {
307             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
308         }
309
310         ActorUtils actorUtils = dataStoreType == DataStoreType.Config
311                 ? configDataStore.getActorUtils() : operDataStore.getActorUtils();
312
313         LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
314                 actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType);
315
316         final Future<ActorRef> localShardReply = actorUtils.findLocalShardAsync(shardName);
317
318         final scala.concurrent.Promise<Object> makeLeaderLocalAsk = akka.dispatch.Futures.promise();
319         localShardReply.onComplete(new OnComplete<ActorRef>() {
320             @Override
321             public void onComplete(final Throwable failure, final ActorRef actorRef) {
322                 if (failure != null) {
323                     LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to"
324                             + " local shard.", shardName, dataStoreType, failure);
325                     makeLeaderLocalAsk.failure(failure);
326                 } else {
327                     makeLeaderLocalAsk
328                             .completeWith(actorUtils
329                                     .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
330                 }
331             }
332         }, actorUtils.getClientDispatcher());
333
334         final SettableFuture<RpcResult<MakeLeaderLocalOutput>> future = SettableFuture.create();
335         makeLeaderLocalAsk.future().onComplete(new OnComplete<>() {
336             @Override
337             public void onComplete(final Throwable failure, final Object success) {
338                 if (failure != null) {
339                     LOG.error("Leadership transfer failed for shard {}.", shardName, failure);
340                     future.set(RpcResultBuilder.<MakeLeaderLocalOutput>failed().withError(ErrorType.APPLICATION,
341                             "leadership transfer failed", failure).build());
342                     return;
343                 }
344
345                 LOG.debug("Leadership transfer complete");
346                 future.set(RpcResultBuilder.success(new MakeLeaderLocalOutputBuilder().build()).build());
347             }
348         }, actorUtils.getClientDispatcher());
349
350         return future;
351     }
352
353     @VisibleForTesting ListenableFuture<RpcResult<AddReplicasForAllShardsOutput>> addReplicasForAllShards(
354             final AddReplicasForAllShardsInput input) {
355         LOG.info("Adding replicas for all shards");
356
357         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
358
359         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new);
360         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new);
361
362         return waitForShardResults(shardResultData, shardResults ->
363                 new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
364                 "Failed to add replica");
365     }
366
367     @VisibleForTesting ListenableFuture<RpcResult<RemoveAllShardReplicasOutput>> removeAllShardReplicas(
368             final RemoveAllShardReplicasInput input) {
369         LOG.info("Removing replicas for all shards");
370
371         final String memberName = input.getMemberName();
372         if (Strings.isNullOrEmpty(memberName)) {
373             return newFailedRpcResultFuture("A valid member name must be specified");
374         }
375
376         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
377         Function<String, Object> messageSupplier = shardName ->
378                 new RemoveShardReplica(shardName, MemberName.forName(memberName));
379
380         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
381         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
382
383         return waitForShardResults(shardResultData,
384             shardResults -> new RemoveAllShardReplicasOutputBuilder().setShardResult(shardResults).build(),
385             "       Failed to remove replica");
386     }
387
388     @VisibleForTesting
389     ListenableFuture<RpcResult<ChangeMemberVotingStatesForShardOutput>> changeMemberVotingStatesForShard(
390             final ChangeMemberVotingStatesForShardInput input) {
391         final String shardName = input.getShardName();
392         if (Strings.isNullOrEmpty(shardName)) {
393             return newFailedRpcResultFuture("A valid shard name must be specified");
394         }
395
396         final var dataStoreType = input.getDataStoreType();
397         if (dataStoreType == null) {
398             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
399         }
400
401         final var memberVotingStates = input.getMemberVotingState();
402         if (memberVotingStates == null || memberVotingStates.isEmpty()) {
403             return newFailedRpcResultFuture("No member voting state input was specified");
404         }
405
406         final var changeVotingStatus = toChangeShardMembersVotingStatus(shardName, memberVotingStates);
407         LOG.info("Change member voting states for shard {}: {}", shardName,
408                 changeVotingStatus.getMeberVotingStatusMap());
409
410         final var returnFuture = SettableFuture.<RpcResult<ChangeMemberVotingStatesForShardOutput>>create();
411         Futures.addCallback(sendMessageToShardManager(dataStoreType, changeVotingStatus),
412             new FutureCallback<Success>() {
413                 @Override
414                 public void onSuccess(final Success success) {
415                     LOG.info("Successfully changed member voting states for shard {}", shardName);
416                     returnFuture.set(newSuccessfulResult(new ChangeMemberVotingStatesForShardOutputBuilder().build()));
417                 }
418
419                 @Override
420                 public void onFailure(final Throwable failure) {
421                     onMessageFailure(String.format("Failed to change member voting states for shard %s", shardName),
422                         returnFuture, failure);
423                 }
424             }, MoreExecutors.directExecutor());
425
426         return returnFuture;
427     }
428
429     @VisibleForTesting
430     ListenableFuture<RpcResult<ChangeMemberVotingStatesForAllShardsOutput>> changeMemberVotingStatesForAllShards(
431             final ChangeMemberVotingStatesForAllShardsInput input) {
432         List<MemberVotingState> memberVotingStates = input.getMemberVotingState();
433         if (memberVotingStates == null || memberVotingStates.isEmpty()) {
434             return newFailedRpcResultFuture("No member voting state input was specified");
435         }
436
437         final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
438         Function<String, Object> messageSupplier = shardName ->
439                 toChangeShardMembersVotingStatus(shardName, memberVotingStates);
440
441         LOG.info("Change member voting states for all shards");
442
443         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
444         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
445
446         return waitForShardResults(shardResultData, shardResults ->
447                 new ChangeMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
448                 "Failed to change member voting states");
449     }
450
451     @VisibleForTesting
452     ListenableFuture<RpcResult<FlipMemberVotingStatesForAllShardsOutput>> flipMemberVotingStatesForAllShards(
453             final FlipMemberVotingStatesForAllShardsInput input) {
454         final var shardResultData = new ArrayList<Entry<ListenableFuture<Success>, ShardResultBuilder>>();
455         final Function<String, Object> messageSupplier = FlipShardMembersVotingStatus::new;
456
457         LOG.info("Flip member voting states for all shards");
458
459         sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
460         sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
461
462         return waitForShardResults(shardResultData, shardResults ->
463                 new FlipMemberVotingStatesForAllShardsOutputBuilder().setShardResult(shardResults).build(),
464                 "Failed to change member voting states");
465     }
466
467     private ListenableFuture<RpcResult<GetShardRoleOutput>> getShardRole(final GetShardRoleInput input) {
468         final String shardName = input.getShardName();
469         if (Strings.isNullOrEmpty(shardName)) {
470             return newFailedRpcResultFuture("A valid shard name must be specified");
471         }
472
473         DataStoreType dataStoreType = input.getDataStoreType();
474         if (dataStoreType == null) {
475             return newFailedRpcResultFuture("A valid DataStoreType must be specified");
476         }
477
478         LOG.info("Getting role for shard {}, datastore type {}", shardName, dataStoreType);
479
480         final SettableFuture<RpcResult<GetShardRoleOutput>> returnFuture = SettableFuture.create();
481         ListenableFuture<GetShardRoleReply> future = sendMessageToShardManager(dataStoreType,
482                 new GetShardRole(shardName));
483         Futures.addCallback(future, new FutureCallback<GetShardRoleReply>() {
484             @Override
485             public void onSuccess(final GetShardRoleReply reply) {
486                 if (reply == null) {
487                     returnFuture.set(ClusterAdminRpcService.<GetShardRoleOutput>newFailedRpcResultBuilder(
488                             "No Shard role present. Please retry..").build());
489                     return;
490                 }
491                 LOG.info("Successfully received role:{} for shard {}", reply.getRole(), shardName);
492                 final GetShardRoleOutputBuilder builder = new GetShardRoleOutputBuilder();
493                 if (reply.getRole() != null) {
494                     builder.setRole(reply.getRole());
495                 }
496                 returnFuture.set(newSuccessfulResult(builder.build()));
497             }
498
499             @Override
500             public void onFailure(final Throwable failure) {
501                 returnFuture.set(ClusterAdminRpcService.<GetShardRoleOutput>newFailedRpcResultBuilder(
502                         "Failed to get shard role.", failure).build());
503             }
504         }, MoreExecutors.directExecutor());
505
506         return returnFuture;
507     }
508
509     @VisibleForTesting
510     ListenableFuture<RpcResult<BackupDatastoreOutput>> backupDatastore(final BackupDatastoreInput input) {
511         LOG.debug("backupDatastore: {}", input);
512
513         if (Strings.isNullOrEmpty(input.getFilePath())) {
514             return newFailedRpcResultFuture("A valid file path must be specified");
515         }
516
517         final Uint32 timeout = input.getTimeout();
518         final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS)
519                 : SHARD_MGR_TIMEOUT;
520
521         final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
522         ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(new GetSnapshot(opTimeout));
523         Futures.addCallback(future, new FutureCallback<>() {
524             @Override
525             public void onSuccess(final List<DatastoreSnapshot> snapshots) {
526                 saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
527             }
528
529             @Override
530             public void onFailure(final Throwable failure) {
531                 onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure);
532             }
533         }, MoreExecutors.directExecutor());
534
535         return returnFuture;
536     }
537
538     private ListenableFuture<RpcResult<GetKnownClientsForAllShardsOutput>> getKnownClientsForAllShards(
539             final GetKnownClientsForAllShardsInput input) {
540         final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies =
541                 getAllShardLeadersClients();
542         return Futures.whenAllComplete(allShardReplies.values()).call(() -> processReplies(allShardReplies),
543             MoreExecutors.directExecutor());
544     }
545
546     private ListenableFuture<RpcResult<ActivateEosDatacenterOutput>> activateEosDatacenter(
547             final ActivateEosDatacenterInput input) {
548         LOG.debug("Activating EOS Datacenter");
549         final SettableFuture<RpcResult<ActivateEosDatacenterOutput>> future = SettableFuture.create();
550         Futures.addCallback(dataCenterControl.activateDataCenter(), new FutureCallback<>() {
551             @Override
552             public void onSuccess(final Empty result) {
553                 LOG.debug("Successfully activated datacenter.");
554                 future.set(RpcResultBuilder.<ActivateEosDatacenterOutput>success().build());
555             }
556
557             @Override
558             public void onFailure(final Throwable failure) {
559                 future.set(ClusterAdminRpcService.<ActivateEosDatacenterOutput>newFailedRpcResultBuilder(
560                         "Failed to activate datacenter.", failure).build());
561             }
562         }, MoreExecutors.directExecutor());
563
564         return future;
565     }
566
567     private ListenableFuture<RpcResult<DeactivateEosDatacenterOutput>> deactivateEosDatacenter(
568             final DeactivateEosDatacenterInput input) {
569         LOG.debug("Deactivating EOS Datacenter");
570         final SettableFuture<RpcResult<DeactivateEosDatacenterOutput>> future = SettableFuture.create();
571         Futures.addCallback(dataCenterControl.deactivateDataCenter(), new FutureCallback<>() {
572             @Override
573             public void onSuccess(final Empty result) {
574                 LOG.debug("Successfully deactivated datacenter.");
575                 future.set(RpcResultBuilder.<DeactivateEosDatacenterOutput>success().build());
576             }
577
578             @Override
579             public void onFailure(final Throwable failure) {
580                 future.set(ClusterAdminRpcService.<DeactivateEosDatacenterOutput>newFailedRpcResultBuilder(
581                         "Failed to deactivate datacenter.", failure).build());
582             }
583         }, MoreExecutors.directExecutor());
584
585         return future;
586     }
587
588     private static RpcResult<GetKnownClientsForAllShardsOutput> processReplies(
589             final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies) {
590         final Map<ShardResultKey, ShardResult> result = Maps.newHashMapWithExpectedSize(allShardReplies.size());
591         for (Entry<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> entry : allShardReplies.entrySet()) {
592             final ListenableFuture<GetKnownClientsReply> future = entry.getValue();
593             final ShardResultBuilder builder = new ShardResultBuilder()
594                     .setDataStoreType(entry.getKey().getDataStoreType())
595                     .setShardName(entry.getKey().getShardName());
596
597             final GetKnownClientsReply reply;
598             try {
599                 reply = Futures.getDone(future);
600             } catch (ExecutionException e) {
601                 LOG.debug("Shard {} failed to answer", entry.getKey(), e);
602                 final ShardResult sr = builder
603                         .setSucceeded(Boolean.FALSE)
604                         .setErrorMessage(e.getCause().getMessage())
605                         .build();
606                 result.put(sr.key(), sr);
607                 continue;
608             }
609
610             final ShardResult sr = builder
611                     .setSucceeded(Boolean.TRUE)
612                     .addAugmentation(new ShardResult1Builder()
613                         .setKnownClients(reply.getClients().stream()
614                             .map(client -> new KnownClientsBuilder()
615                                 .setMember(client.getFrontendId().getMemberName().toYang())
616                                 .setType(client.getFrontendId().getClientType().toYang())
617                                 .setGeneration(client.getYangGeneration())
618                                 .build())
619                             .collect(Collectors.toMap(KnownClients::key, Function.identity())))
620                         .build())
621                     .build();
622
623             result.put(sr.key(), sr);
624         }
625
626         return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build())
627                 .build();
628     }
629
630     private static ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
631             final List<MemberVotingState> memberVotingStatus) {
632         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
633         for (MemberVotingState memberStatus: memberVotingStatus) {
634             serverVotingStatusMap.put(memberStatus.getMemberName(), memberStatus.getVoting());
635         }
636         return new ChangeShardMembersVotingStatus(shardName, serverVotingStatusMap);
637     }
638
639     private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
640             final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
641             final Function<Map<ShardResultKey, ShardResult>, T> resultDataSupplier,
642             final String failureLogMsgPrefix) {
643         final SettableFuture<RpcResult<T>> returnFuture = SettableFuture.create();
644         final Map<ShardResultKey, ShardResult> shardResults = new HashMap<>();
645         for (final Entry<ListenableFuture<Success>, ShardResultBuilder> entry : shardResultData) {
646             Futures.addCallback(entry.getKey(), new FutureCallback<Success>() {
647                 @Override
648                 public void onSuccess(final Success result) {
649                     synchronized (shardResults) {
650                         final ShardResultBuilder builder = entry.getValue();
651                         LOG.debug("onSuccess for shard {}, type {}", builder.getShardName(),
652                             builder.getDataStoreType());
653                         final ShardResult sr = builder.setSucceeded(Boolean.TRUE).build();
654                         shardResults.put(sr.key(), sr);
655                         checkIfComplete();
656                     }
657                 }
658
659                 @Override
660                 public void onFailure(final Throwable failure) {
661                     synchronized (shardResults) {
662                         ShardResultBuilder builder = entry.getValue();
663                         LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, builder.getShardName(),
664                                 builder.getDataStoreType(), failure);
665                         final ShardResult sr = builder
666                                 .setSucceeded(Boolean.FALSE)
667                                 .setErrorMessage(Throwables.getRootCause(failure).getMessage())
668                                 .build();
669                         shardResults.put(sr.key(), sr);
670                         checkIfComplete();
671                     }
672                 }
673
674                 void checkIfComplete() {
675                     LOG.debug("checkIfComplete: expected {}, actual {}", shardResultData.size(), shardResults.size());
676                     if (shardResults.size() == shardResultData.size()) {
677                         returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults)));
678                     }
679                 }
680             }, MoreExecutors.directExecutor());
681         }
682         return returnFuture;
683     }
684
685     private <T> void sendMessageToManagerForConfiguredShards(final DataStoreType dataStoreType,
686             final List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
687             final Function<String, Object> messageSupplier) {
688         ActorUtils actorUtils = dataStoreType == DataStoreType.Config ? configDataStore.getActorUtils()
689                 : operDataStore.getActorUtils();
690         Set<String> allShardNames = actorUtils.getConfiguration().getAllShardNames();
691
692         LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorUtils.getDataStoreName());
693
694         for (String shardName: allShardNames) {
695             ListenableFuture<T> future = this.ask(actorUtils.getShardManager(), messageSupplier.apply(shardName),
696                                                   SHARD_MGR_TIMEOUT);
697             shardResultData.add(new SimpleEntry<>(future,
698                     new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
699         }
700     }
701
702     private <T> ListenableFuture<List<T>> sendMessageToShardManagers(final Object message) {
703         Timeout timeout = SHARD_MGR_TIMEOUT;
704         ListenableFuture<T> configFuture = ask(configDataStore.getActorUtils().getShardManager(), message, timeout);
705         ListenableFuture<T> operFuture = ask(operDataStore.getActorUtils().getShardManager(), message, timeout);
706
707         return Futures.allAsList(configFuture, operFuture);
708     }
709
710     private <T> ListenableFuture<T> sendMessageToShardManager(final DataStoreType dataStoreType, final Object message) {
711         ActorRef shardManager = dataStoreType == DataStoreType.Config
712                 ? configDataStore.getActorUtils().getShardManager()
713                         : operDataStore.getActorUtils().getShardManager();
714         return ask(shardManager, message, SHARD_MGR_TIMEOUT);
715     }
716
717     @SuppressWarnings("checkstyle:IllegalCatch")
718     private static void saveSnapshotsToFile(final DatastoreSnapshotList snapshots, final String fileName,
719             final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture) {
720         try (FileOutputStream fos = new FileOutputStream(fileName)) {
721             SerializationUtils.serialize(snapshots, fos);
722
723             returnFuture.set(newSuccessfulResult(new BackupDatastoreOutputBuilder().build()));
724             LOG.info("Successfully backed up datastore to file {}", fileName);
725         } catch (IOException | RuntimeException e) {
726             onDatastoreBackupFailure(fileName, returnFuture, e);
727         }
728     }
729
730     private static <T> void onDatastoreBackupFailure(final String fileName,
731             final SettableFuture<RpcResult<T>> returnFuture, final Throwable failure) {
732         onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure);
733     }
734
735     @SuppressFBWarnings("SLF4J_SIGN_ONLY_FORMAT")
736     private static <T> void onMessageFailure(final String msg, final SettableFuture<RpcResult<T>> returnFuture,
737             final Throwable failure) {
738         LOG.error("{}", msg, failure);
739         returnFuture.set(ClusterAdminRpcService.<T>newFailedRpcResultBuilder(String.format("%s: %s", msg,
740                 failure.getMessage())).build());
741     }
742
743     private <T> ListenableFuture<T> ask(final ActorRef actor, final Object message, final Timeout timeout) {
744         final SettableFuture<T> returnFuture = SettableFuture.create();
745
746         @SuppressWarnings("unchecked")
747         Future<T> askFuture = (Future<T>) Patterns.ask(actor, message, timeout);
748         askFuture.onComplete(new OnComplete<T>() {
749             @Override
750             public void onComplete(final Throwable failure, final T resp) {
751                 if (failure != null) {
752                     returnFuture.setException(failure);
753                 } else {
754                     returnFuture.set(resp);
755                 }
756             }
757         }, configDataStore.getActorUtils().getClientDispatcher());
758
759         return returnFuture;
760     }
761
762     private ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> getAllShardLeadersClients() {
763         final ImmutableMap.Builder<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> builder =
764                 ImmutableMap.builder();
765
766         addAllShardsClients(builder, DataStoreType.Config, configDataStore.getActorUtils());
767         addAllShardsClients(builder, DataStoreType.Operational, operDataStore.getActorUtils());
768
769         return builder.build();
770     }
771
772     private static void addAllShardsClients(
773             final ImmutableMap.Builder<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> builder,
774             final DataStoreType type, final ActorUtils utils) {
775         for (String shardName : utils.getConfiguration().getAllShardNames()) {
776             final SettableFuture<GetKnownClientsReply> future = SettableFuture.create();
777             builder.put(new ShardIdentifier(type, shardName), future);
778
779             utils.findPrimaryShardAsync(shardName).flatMap(
780                 info -> Patterns.ask(info.getPrimaryShardActor(), GetKnownClients.INSTANCE, SHARD_MGR_TIMEOUT),
781                 utils.getClientDispatcher()).onComplete(new OnComplete<>() {
782                     @Override
783                     public void onComplete(final Throwable failure, final Object success) {
784                         if (failure == null) {
785                             future.set((GetKnownClientsReply) success);
786                         } else {
787                             future.setException(failure);
788                         }
789                     }
790                 }, utils.getClientDispatcher());
791         }
792     }
793
794     private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(final String message) {
795         return ClusterAdminRpcService.<T>newFailedRpcResultBuilder(message).buildFuture();
796     }
797
798     private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(final String message) {
799         return newFailedRpcResultBuilder(message, null);
800     }
801
802     private static <T> RpcResultBuilder<T> newFailedRpcResultBuilder(final String message, final Throwable cause) {
803         return RpcResultBuilder.<T>failed().withError(ErrorType.RPC, message, cause);
804     }
805
806     private static <T> RpcResult<T> newSuccessfulResult(final T data) {
807         return RpcResultBuilder.success(data).build();
808     }
809 }