Fix checkstyle violations in clustering-it-provider
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.controller.cluster.ActorSystemProvider;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
38 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
42 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
43 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
44 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
45 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
46 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
47 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
49 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
51 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
52 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
53 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
54 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
55 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
56 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
57 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
62 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
63 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
64 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
65 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
66 import org.opendaylight.controller.sal.core.api.model.SchemaService;
67 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
68 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
72 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
73 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
105 import org.opendaylight.yangtools.concepts.ListenerRegistration;
106 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
109 import org.opendaylight.yangtools.yang.common.RpcResult;
110 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
111 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.slf4j.Logger;
114 import org.slf4j.LoggerFactory;
115 import scala.concurrent.duration.FiniteDuration;
116
117 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
118
119     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
120     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
121             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
122
123     private final RpcProviderRegistry rpcRegistry;
124     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
125     private final DistributedShardFactory distributedShardFactory;
126     private final DistributedDataStoreInterface configDataStore;
127     private final DOMDataTreeService domDataTreeService;
128     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
129     private final DOMDataBroker domDataBroker;
130     private final NotificationPublishService notificationPublishService;
131     private final NotificationService notificationService;
132     private final SchemaService schemaService;
133     private final ClusterSingletonServiceProvider singletonService;
134     private final DOMRpcProviderService domRpcService;
135     private final PrefixLeaderHandler prefixLeaderHandler;
136     private final PrefixShardHandler prefixShardHandler;
137     private final DOMDataTreeChangeService domDataTreeChangeService;
138     private final ActorSystem actorSystem;
139
140     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
141             routedRegistrations = new HashMap<>();
142
143     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
144
145     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
146     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
147     private FlappingSingletonService flappingSingletonService;
148     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
149     private IdIntsListener idIntsListener;
150     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
151     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
152     private IdIntsDOMDataTreeLIstener idIntsDdtl;
153
154
155
156     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
157                                      final DOMRpcProviderService domRpcService,
158                                      final ClusterSingletonServiceProvider singletonService,
159                                      final SchemaService schemaService,
160                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
161                                      final NotificationPublishService notificationPublishService,
162                                      final NotificationService notificationService,
163                                      final DOMDataBroker domDataBroker,
164                                      final DOMDataTreeService domDataTreeService,
165                                      final DistributedShardFactory distributedShardFactory,
166                                      final DistributedDataStoreInterface configDataStore,
167                                      final ActorSystemProvider actorSystemProvider) {
168         this.rpcRegistry = rpcRegistry;
169         this.domRpcService = domRpcService;
170         this.singletonService = singletonService;
171         this.schemaService = schemaService;
172         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
173         this.notificationPublishService = notificationPublishService;
174         this.notificationService = notificationService;
175         this.domDataBroker = domDataBroker;
176         this.domDataTreeService = domDataTreeService;
177         this.distributedShardFactory = distributedShardFactory;
178         this.configDataStore = configDataStore;
179         this.actorSystem = actorSystemProvider.getActorSystem();
180
181         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
182
183         domDataTreeChangeService =
184                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
185
186         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
187
188         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
189                 bindingNormalizedNodeSerializer);
190     }
191
192     @Override
193     @SuppressWarnings("checkstyle:IllegalCatch")
194     public Future<RpcResult<Void>> unregisterSingletonConstant() {
195         LOG.debug("unregister-singleton-constant");
196
197         if (getSingletonConstantRegistration == null) {
198             LOG.debug("No get-singleton-constant registration present.");
199             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
200                     "No get-singleton-constant rpc registration present.");
201             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
202             return Futures.immediateFuture(result);
203         }
204
205         try {
206             getSingletonConstantRegistration.close();
207             getSingletonConstantRegistration = null;
208
209             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
210         } catch (Exception e) {
211             LOG.debug("There was a problem closing the singleton constant service", e);
212             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
213                     "There was a problem closing get-singleton-constant");
214             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
215             return Futures.immediateFuture(result);
216         }
217     }
218
219     @Override
220     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
221         LOG.debug("publish-notifications, input: {}", input);
222
223         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
224                 input.getSeconds(), input.getNotificationsPerSecond());
225
226         publishNotificationsTasks.put(input.getId(), task);
227
228         task.start();
229
230         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
231     }
232
233     @Override
234     public Future<RpcResult<Void>> subscribeDtcl() {
235
236         if (dtclReg != null) {
237             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
238                     "There is already dataTreeChangeListener registered on id-ints list.");
239             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
240         }
241
242         idIntsListener = new IdIntsListener();
243
244         dtclReg = domDataTreeChangeService
245                 .registerDataTreeChangeListener(
246                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
247                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
248                         idIntsListener);
249
250         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
251     }
252
253     @Override
254     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
255         LOG.debug("write-transactions, input: {}", input);
256         return WriteTransactionsHandler.start(domDataBroker, input);
257     }
258
259     @Override
260     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
261         return null;
262     }
263
264     @Override
265     public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
266         return null;
267     }
268
269     @Override
270     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
271
272         LOG.debug("subscribe-ynl, input: {}", input);
273
274         if (ynlRegistrations.containsKey(input.getId())) {
275             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
276                     "There is already ynl listener registered for this id: " + input.getId());
277             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
278         }
279
280         ynlRegistrations.put(input.getId(),
281                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
282
283         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
284     }
285
286     @Override
287     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
288         LOG.debug("remove-prefix-shard, input: {}", input);
289
290         return prefixShardHandler.onRemovePrefixShard(input);
291     }
292
293     @Override
294     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
295         LOG.debug("become-prefix-leader, input: {}", input);
296
297         return prefixLeaderHandler.makeLeaderLocal(input);
298     }
299
300     @Override
301     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
302         LOG.debug("unregister-bound-constant, {}", input);
303
304         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
305                 routedRegistrations.remove(input.getContext());
306
307         if (rpcRegistration == null) {
308             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
309             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
310                     "No get-constant rpc registration present.");
311             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
312             return Futures.immediateFuture(result);
313         }
314
315         rpcRegistration.close();
316         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
317     }
318
319     @Override
320     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
321
322         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
323
324         if (input.getConstant() == null) {
325             final RpcError error = RpcResultBuilder.newError(
326                     ErrorType.RPC, "Invalid input.", "Constant value is null");
327             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
328         }
329
330         getSingletonConstantRegistration =
331                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
332
333         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
334     }
335
336     @Override
337     public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
338         return null;
339     }
340
341     @Override
342     public Future<RpcResult<Void>> unregisterConstant() {
343
344         if (globalGetConstantRegistration == null) {
345             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
346                     "No get-constant rpc registration present.");
347             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
348             return Futures.immediateFuture(result);
349         }
350
351         globalGetConstantRegistration.close();
352         globalGetConstantRegistration = null;
353
354         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
355     }
356
357     @Override
358     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
359         LOG.debug("unregister-flapping-singleton received.");
360
361         if (flappingSingletonService == null) {
362             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
363                     "No flapping-singleton registration present.");
364             final RpcResult<UnregisterFlappingSingletonOutput> result =
365                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
366             return Futures.immediateFuture(result);
367         }
368
369         final long flapCount = flappingSingletonService.setInactive();
370         flappingSingletonService = null;
371
372         final UnregisterFlappingSingletonOutput output =
373                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
374
375         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
376     }
377
378     @Override
379     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
380         return null;
381     }
382
383     @Override
384     public Future<RpcResult<Void>> subscribeDdtl() {
385
386         if (ddtlReg != null) {
387             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
388                     "There is already dataTreeChangeListener registered on id-ints list.");
389             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
390         }
391
392         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
393
394         try {
395             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
396                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
397                             ProduceTransactionsHandler.ID_INT_YID)),
398                     true, Collections.emptyList());
399         } catch (DOMDataTreeLoopException e) {
400             LOG.error("Failed to register DOMDataTreeListener.", e);
401
402         }
403
404         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
405     }
406
407     @Override
408     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
409         LOG.debug("register-bound-constant: {}", input);
410
411         if (input.getContext() == null) {
412             final RpcError error = RpcResultBuilder.newError(
413                     ErrorType.RPC, "Invalid input.", "Context value is null");
414             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
415         }
416
417         if (input.getConstant() == null) {
418             final RpcError error = RpcResultBuilder.newError(
419                     ErrorType.RPC, "Invalid input.", "Constant value is null");
420             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
421         }
422
423         if (routedRegistrations.containsKey(input.getContext())) {
424             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
425                     "There is already a rpc registered for context: " + input.getContext());
426             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
427         }
428
429         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
430                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
431                         input.getConstant(), input.getContext());
432
433         routedRegistrations.put(input.getContext(), rpcRegistration);
434         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
435     }
436
437     @Override
438     public Future<RpcResult<Void>> registerFlappingSingleton() {
439         LOG.debug("Received register-flapping-singleton.");
440
441         if (flappingSingletonService != null) {
442             final RpcError error = RpcResultBuilder.newError(
443                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
444             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
445         }
446
447         flappingSingletonService = new FlappingSingletonService(singletonService);
448
449         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
450     }
451
452     @Override
453     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
454         LOG.debug("Received unsubscribe-dtcl");
455
456         if (idIntsListener == null || dtclReg == null) {
457             final RpcError error = RpcResultBuilder.newError(
458                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
459             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
460                     .withRpcError(error).build());
461         }
462
463         try {
464             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
465         } catch (InterruptedException | ExecutionException | TimeoutException e) {
466             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
467                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
468             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
469                     .withRpcError(error).build());
470         }
471
472         dtclReg.close();
473         dtclReg = null;
474
475         if (!idIntsListener.hasTriggered()) {
476             final RpcError error = RpcResultBuilder.newError(
477                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
478                             + "any notifications.");
479             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
480                     .withRpcError(error).build());
481         }
482
483         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
484         try {
485             final Optional<NormalizedNode<?, ?>> readResult =
486                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
487
488             if (!readResult.isPresent()) {
489                 final RpcError error = RpcResultBuilder.newError(
490                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
491                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
492                         .withRpcError(error).build());
493             }
494
495             return Futures.immediateFuture(
496                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
497                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
498
499         } catch (final ReadFailedException e) {
500             final RpcError error = RpcResultBuilder.newError(
501                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
502             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
503                     .withRpcError(error).build());
504
505         }
506     }
507
508     @Override
509     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
510         LOG.debug("create-prefix-shard, input: {}", input);
511
512         return prefixShardHandler.onCreatePrefixShard(input);
513     }
514
515     @Override
516     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
517         return null;
518     }
519
520     @Override
521     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
522         LOG.debug("Received unsubscribe-ynl, input: {}", input);
523
524         if (!ynlRegistrations.containsKey(input.getId())) {
525             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
526                     "No ynl listener with this id registered.");
527             final RpcResult<UnsubscribeYnlOutput> result =
528                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
529             return Futures.immediateFuture(result);
530         }
531
532         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
533         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
534
535         reg.close();
536
537         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
538     }
539
540     @Override
541     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
542             final CheckPublishNotificationsInput input) {
543
544         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
545
546         if (task == null) {
547             return Futures.immediateFuture(RpcResultBuilder.success(
548                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
549         }
550
551         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
552                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
553
554         if (task.getLastError() != null) {
555             final StringWriter sw = new StringWriter();
556             final PrintWriter pw = new PrintWriter(sw);
557             LOG.error("Last error for {}", task, task.getLastError());
558             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
559         }
560
561         final CheckPublishNotificationsOutput output =
562                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
563
564         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
565     }
566
567     @Override
568     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
569         LOG.debug("producer-transactions, input: {}", input);
570         return ProduceTransactionsHandler.start(domDataTreeService, input);
571     }
572
573     @Override
574     public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
575         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
576
577         final String shardName = input.getShardName();
578         if (Strings.isNullOrEmpty(shardName)) {
579             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
580                     "A valid shard name must be specified");
581             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
582         }
583
584         return shutdownShardGracefully(shardName);
585     }
586
587     @Override
588     public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
589         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
590
591         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
592
593         if (shardPrefix == null) {
594             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
595                     "A valid shard prefix must be specified");
596             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
597         }
598
599         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
600         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
601
602         return shutdownShardGracefully(cleanPrefixShardName);
603     }
604
605     private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
606         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
607         final ActorContext context = configDataStore.getActorContext();
608
609         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
610                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
611         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
612         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
613
614         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
615             @Override
616             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
617                 if (throwable != null) {
618                     shutdownShardAsk.failure(throwable);
619                 } else {
620                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
621                 }
622             }
623         }, context.getClientDispatcher());
624
625         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
626             @Override
627             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
628                 if (throwable != null) {
629                     final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
630                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
631                     rpcResult.set(failedResult);
632                 } else {
633                     // according to Patterns.gracefulStop API, we don't have to
634                     // check value of gracefulStopResult
635                     rpcResult.set(RpcResultBuilder.<Void>success().build());
636                 }
637             }
638         }, context.getClientDispatcher());
639         return rpcResult;
640     }
641
642     @Override
643     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
644
645         LOG.debug("Received register-constant rpc, input: {}", input);
646
647         if (input.getConstant() == null) {
648             final RpcError error = RpcResultBuilder.newError(
649                     ErrorType.RPC, "Invalid input.", "Constant value is null");
650             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
651         }
652
653         if (globalGetConstantRegistration != null) {
654             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
655                     "There is already a get-constant rpc registered.");
656             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
657         }
658
659         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
660         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
661     }
662
663     @Override
664     public Future<RpcResult<Void>> unregisterDefaultConstant() {
665         return null;
666     }
667
668     @Override
669     @SuppressWarnings("checkstyle:IllegalCatch")
670     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
671         LOG.debug("Received unsubscribe-ddtl.");
672
673         if (idIntsDdtl == null || ddtlReg == null) {
674             final RpcError error = RpcResultBuilder.newError(
675                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
676             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
677                     .withRpcError(error).build());
678         }
679
680         try {
681             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
682         } catch (InterruptedException | ExecutionException | TimeoutException e) {
683             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
684                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
685             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
686                     .withRpcError(error).build());
687         }
688
689         ddtlReg.close();
690         ddtlReg = null;
691
692         if (!idIntsDdtl.hasTriggered()) {
693             final RpcError error = RpcResultBuilder.newError(
694                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
695                             + "any notifications.");
696             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
697                     .withRpcError(error).build());
698         }
699
700         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
701         LOG.debug("Creating distributed datastore client for shard {}", shardName);
702
703         final ActorContext actorContext = configDataStore.getActorContext();
704         final Props distributedDataStoreClientProps =
705                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
706                         "Shard-" + shardName, actorContext, shardName);
707
708         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
709         final DataStoreClient distributedDataStoreClient;
710         try {
711             distributedDataStoreClient = SimpleDataStoreClientActor
712                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
713         } catch (RuntimeException e) {
714             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
715             clientActor.tell(PoisonPill.getInstance(), noSender());
716             final RpcError error = RpcResultBuilder.newError(
717                     ErrorType.APPLICATION, "Unable to create ds client for read.",
718                     "Unable to create ds client for read.");
719             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
720                     .withRpcError(error).build());
721         }
722
723         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
724         final ClientTransaction tx = localHistory.createTransaction();
725         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
726                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
727                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
728
729         tx.abort();
730         localHistory.close();
731         try {
732             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
733             if (!optional.isPresent()) {
734                 LOG.warn("Final read from client is empty.");
735                 final RpcError error = RpcResultBuilder.newError(
736                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
737                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
738                         .withRpcError(error).build());
739             }
740
741             return Futures.immediateFuture(
742                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
743                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
744
745         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
746             LOG.error("Unable to read data to verify ddtl data.", e);
747             final RpcError error = RpcResultBuilder.newError(
748                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
749             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
750                     .withRpcError(error).build());
751         } finally {
752             distributedDataStoreClient.close();
753             clientActor.tell(PoisonPill.getInstance(), noSender());
754         }
755     }
756 }