Bug 2187: Add datastoreType to add-shard-replica RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Success;
12 import akka.dispatch.OnComplete;
13 import akka.pattern.Patterns;
14 import akka.util.Timeout;
15 import com.google.common.base.Strings;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.io.FileOutputStream;
21 import java.util.List;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import org.apache.commons.lang3.SerializationUtils;
25 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
26 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
27 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
28 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
29 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
30 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
31 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
39 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Implements the yang RPCs defined in the generated ClusterAdminService interface.
47  *
48  * @author Thomas Pantelis
49  */
50 public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
51     private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
52
53     private final DistributedDataStore configDataStore;
54     private final DistributedDataStore operDataStore;
55     private RpcRegistration<ClusterAdminService> rpcRegistration;
56
57     public ClusterAdminRpcService(DistributedDataStore configDataStore, DistributedDataStore operDataStore) {
58         this.configDataStore = configDataStore;
59         this.operDataStore = operDataStore;
60     }
61
62     public void start(RpcProviderRegistry rpcProviderRegistry) {
63         LOG.debug("ClusterAdminRpcService starting");
64
65         rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this);
66     }
67
68     @Override
69     public void close() {
70         if(rpcRegistration != null) {
71             rpcRegistration.close();
72         }
73     }
74
75     @Override
76     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
77         final String shardName = input.getShardName();
78         if(Strings.isNullOrEmpty(shardName)) {
79             return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture();
80         }
81
82         DataStoreType dataStoreType = input.getDataStoreType();
83         if(dataStoreType == null) {
84             return newFailedRpcResultBuilder("A valid DataStoreType must be specified").buildFuture();
85         }
86
87         LOG.info("Adding replica for shard {}", shardName);
88
89         final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
90         ListenableFuture<Success> future = sendMessageToShardManager(dataStoreType, new AddShardReplica(shardName));
91         Futures.addCallback(future, new FutureCallback<Success>() {
92             @Override
93             public void onSuccess(Success success) {
94                 LOG.info("Successfully added replica for shard {}", shardName);
95                 returnFuture.set(newSuccessfulResult());
96             }
97
98             @Override
99             public void onFailure(Throwable failure) {
100                 onMessageFailure(String.format("Failed to add replica for shard %s", shardName),
101                         returnFuture, failure);
102             }
103         });
104
105         return returnFuture;
106     }
107
108     @Override
109     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
110         // TODO implement
111         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
112                 "Not implemented yet").buildFuture();
113     }
114
115     @Override
116     public Future<RpcResult<Void>> addReplicasForAllShards() {
117         // TODO implement
118         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
119                 "Not implemented yet").buildFuture();
120     }
121
122     @Override
123     public Future<RpcResult<Void>> removeAllShardReplicas() {
124         // TODO implement
125         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
126                 "Not implemented yet").buildFuture();
127     }
128
129     @Override
130     public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
131         // TODO implement
132         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
133                 "Not implemented yet").buildFuture();
134     }
135
136     @Override
137     public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
138             ConvertMembersToNonvotingForAllShardsInput input) {
139         // TODO implement
140         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
141                 "Not implemented yet").buildFuture();
142     }
143
144     @Override
145     public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
146         LOG.debug("backupDatastore: {}", input);
147
148         if(Strings.isNullOrEmpty(input.getFilePath())) {
149             return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
150         }
151
152         final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
153         ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
154         Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
155             @Override
156             public void onSuccess(List<DatastoreSnapshot> snapshots) {
157                 saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
158             }
159
160             @Override
161             public void onFailure(Throwable failure) {
162                 onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure);
163             }
164         });
165
166         return returnFuture;
167     }
168
169     @SuppressWarnings("unchecked")
170     private <T> ListenableFuture<List<T>> sendMessageToShardManagers(Object message) {
171         Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
172         ListenableFuture<T> configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout);
173         ListenableFuture<T> operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout);
174
175         return Futures.allAsList(configFuture, operFuture);
176     }
177
178     private <T> ListenableFuture<T> sendMessageToShardManager(DataStoreType dataStoreType, Object message) {
179         ActorRef shardManager = dataStoreType == DataStoreType.Config ?
180                 configDataStore.getActorContext().getShardManager() : operDataStore.getActorContext().getShardManager();
181         return ask(shardManager, message, new Timeout(1, TimeUnit.MINUTES));
182     }
183
184     private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
185             SettableFuture<RpcResult<Void>> returnFuture) {
186         try(FileOutputStream fos = new FileOutputStream(fileName)) {
187             SerializationUtils.serialize(snapshots, fos);
188
189             returnFuture.set(newSuccessfulResult());
190             LOG.info("Successfully backed up datastore to file {}", fileName);
191         } catch(Exception e) {
192             onDatastoreBackupFailure(fileName, returnFuture, e);
193         }
194     }
195
196     private static void onDatastoreBackupFailure(String fileName, SettableFuture<RpcResult<Void>> returnFuture,
197             Throwable failure) {
198         onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure);
199     }
200
201     private static void onMessageFailure(String msg, final SettableFuture<RpcResult<Void>> returnFuture,
202             Throwable failure) {
203         LOG.error(msg, failure);
204         returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build());
205     }
206
207     private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
208         final SettableFuture<T> returnFuture = SettableFuture.create();
209
210         @SuppressWarnings("unchecked")
211         scala.concurrent.Future<T> askFuture = (scala.concurrent.Future<T>) Patterns.ask(actor, message, timeout);
212         askFuture.onComplete(new OnComplete<T>() {
213             @Override
214             public void onComplete(Throwable failure, T resp) {
215                 if(failure != null) {
216                     returnFuture.setException(failure);
217                 } else {
218                     returnFuture.set(resp);
219                 }
220             }
221         }, configDataStore.getActorContext().getClientDispatcher());
222
223         return returnFuture;
224     }
225
226     private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message) {
227         return newFailedRpcResultBuilder(message, null);
228     }
229
230     private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message, Throwable cause) {
231         return RpcResultBuilder.<Void>failed().withError(ErrorType.RPC, message, cause);
232     }
233
234     private static RpcResult<Void> newSuccessfulResult() {
235         return RpcResultBuilder.<Void>success().build();
236     }
237 }