Bug 4564: Implement clustering backup-datastore RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / admin / ClusterAdminRpcService.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.admin;
9
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.pattern.Patterns;
13 import akka.util.Timeout;
14 import com.google.common.base.Strings;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.io.FileOutputStream;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.TimeUnit;
24 import org.apache.commons.lang3.SerializationUtils;
25 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
26 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
27 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
28 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
29 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
36 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
37 import org.opendaylight.yangtools.yang.common.RpcResult;
38 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Implements the yang RPCs defined in the generated ClusterAdminService interface.
44  *
45  * @author Thomas Pantelis
46  */
47 public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
48     private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
49
50     private final DistributedDataStore configDataStore;
51     private final DistributedDataStore operDataStore;
52     private RpcRegistration<ClusterAdminService> rpcRegistration;
53
54     public ClusterAdminRpcService(DistributedDataStore configDataStore, DistributedDataStore operDataStore) {
55         this.configDataStore = configDataStore;
56         this.operDataStore = operDataStore;
57     }
58
59     public void start(RpcProviderRegistry rpcProviderRegistry) {
60         LOG.debug("ClusterAdminRpcService starting");
61
62         rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this);
63     }
64
65     @Override
66     public void close() {
67         if(rpcRegistration != null) {
68             rpcRegistration.close();
69         }
70     }
71
72     @Override
73     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
74         // TODO implement
75         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
76                 "Not implemented yet").buildFuture();
77     }
78
79     @Override
80     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
81         // TODO implement
82         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
83                 "Not implemented yet").buildFuture();
84     }
85
86     @Override
87     public Future<RpcResult<Void>> addReplicasForAllShards() {
88         // TODO implement
89         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
90                 "Not implemented yet").buildFuture();
91     }
92
93     @Override
94     public Future<RpcResult<Void>> removeAllShardReplicas() {
95         // TODO implement
96         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
97                 "Not implemented yet").buildFuture();
98     }
99
100     @Override
101     public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
102         // TODO implement
103         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
104                 "Not implemented yet").buildFuture();
105     }
106
107     @Override
108     public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
109             ConvertMembersToNonvotingForAllShardsInput input) {
110         // TODO implement
111         return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
112                 "Not implemented yet").buildFuture();
113     }
114
115     @SuppressWarnings("unchecked")
116     @Override
117     public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
118         LOG.debug("backupDatastore: {}", input);
119
120         if(Strings.isNullOrEmpty(input.getFilePath())) {
121             return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
122         }
123
124         Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
125         ListenableFuture<DatastoreSnapshot> configFuture = ask(configDataStore.getActorContext().getShardManager(),
126                 GetSnapshot.INSTANCE, timeout);
127         ListenableFuture<DatastoreSnapshot> operFuture = ask(operDataStore.getActorContext().getShardManager(),
128                 GetSnapshot.INSTANCE, timeout);
129
130         final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
131         Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
132             @Override
133             public void onSuccess(List<DatastoreSnapshot> snapshots) {
134                 saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture);
135             }
136
137             @Override
138             public void onFailure(Throwable failure) {
139                 onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure);
140             }
141         });
142
143         return returnFuture;
144     }
145
146     private static void saveSnapshotsToFile(ArrayList<DatastoreSnapshot> snapshots, String fileName,
147             SettableFuture<RpcResult<Void>> returnFuture) {
148         try(FileOutputStream fos = new FileOutputStream(fileName)) {
149             SerializationUtils.serialize(snapshots, fos);
150
151             returnFuture.set(newSuccessfulResult());
152             LOG.info("Successfully backed up datastore to file {}", fileName);
153         } catch(Exception e) {
154             onDatastoreBackupFilure(fileName, returnFuture, e);
155         }
156     }
157
158     private static void onDatastoreBackupFilure(String fileName, final SettableFuture<RpcResult<Void>> returnFuture,
159             Throwable failure) {
160         String msg = String.format("Failed to back up datastore to file %s", fileName);
161         LOG.error(msg, failure);
162         returnFuture.set(newFailedRpcResultBuilder(msg, failure).build());
163     }
164
165     private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
166         final SettableFuture<T> returnFuture = SettableFuture.create();
167
168         @SuppressWarnings("unchecked")
169         scala.concurrent.Future<T> askFuture = (scala.concurrent.Future<T>) Patterns.ask(actor, message, timeout);
170         askFuture.onComplete(new OnComplete<T>() {
171             @Override
172             public void onComplete(Throwable failure, T resp) {
173                 if(failure != null) {
174                     returnFuture.setException(failure);
175                 } else {
176                     returnFuture.set(resp);
177                 }
178             }
179         }, configDataStore.getActorContext().getClientDispatcher());
180
181         return returnFuture;
182     }
183
184     private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message) {
185         return newFailedRpcResultBuilder(message, null);
186     }
187
188     private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message, Throwable cause) {
189         return RpcResultBuilder.<Void>failed().withError(ErrorType.RPC, message, cause);
190     }
191
192     private static RpcResult<Void> newSuccessfulResult() {
193         return RpcResultBuilder.<Void>success().build();
194     }
195 }