Fix findbugs violations in md-sal - part 2
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
40 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
41 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
42 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
44 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
49 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
51 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
61 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.controller.sal.core.api.model.SchemaService;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
70 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
71 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
103 import org.opendaylight.yangtools.concepts.ListenerRegistration;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.common.RpcError;
106 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
107 import org.opendaylight.yangtools.yang.common.RpcResult;
108 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
111 import org.slf4j.Logger;
112 import org.slf4j.LoggerFactory;
113 import scala.concurrent.duration.FiniteDuration;
114
115 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
116
117     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
118     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
119             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
120
121     private final RpcProviderRegistry rpcRegistry;
122     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
123     private final DistributedShardFactory distributedShardFactory;
124     private final DistributedDataStoreInterface configDataStore;
125     private final DOMDataTreeService domDataTreeService;
126     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
127     private final DOMDataBroker domDataBroker;
128     private final NotificationPublishService notificationPublishService;
129     private final NotificationService notificationService;
130     private final SchemaService schemaService;
131     private final ClusterSingletonServiceProvider singletonService;
132     private final DOMRpcProviderService domRpcService;
133     private final PrefixLeaderHandler prefixLeaderHandler;
134     private final PrefixShardHandler prefixShardHandler;
135     private final DOMDataTreeChangeService domDataTreeChangeService;
136     private final ActorSystem actorSystem;
137
138     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
139             routedRegistrations = new HashMap<>();
140
141     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
142
143     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
144     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
145     private FlappingSingletonService flappingSingletonService;
146     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
147     private IdIntsListener idIntsListener;
148     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
149     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
150     private IdIntsDOMDataTreeLIstener idIntsDdtl;
151
152
153
154     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
155                                      final DOMRpcProviderService domRpcService,
156                                      final ClusterSingletonServiceProvider singletonService,
157                                      final SchemaService schemaService,
158                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
159                                      final NotificationPublishService notificationPublishService,
160                                      final NotificationService notificationService,
161                                      final DOMDataBroker domDataBroker,
162                                      final DOMDataTreeService domDataTreeService,
163                                      final DistributedShardFactory distributedShardFactory,
164                                      final DistributedDataStoreInterface configDataStore,
165                                      final ActorSystemProvider actorSystemProvider) {
166         this.rpcRegistry = rpcRegistry;
167         this.domRpcService = domRpcService;
168         this.singletonService = singletonService;
169         this.schemaService = schemaService;
170         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
171         this.notificationPublishService = notificationPublishService;
172         this.notificationService = notificationService;
173         this.domDataBroker = domDataBroker;
174         this.domDataTreeService = domDataTreeService;
175         this.distributedShardFactory = distributedShardFactory;
176         this.configDataStore = configDataStore;
177         this.actorSystem = actorSystemProvider.getActorSystem();
178
179         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
180
181         domDataTreeChangeService =
182                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
183
184         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
185
186         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
187                 bindingNormalizedNodeSerializer);
188     }
189
190     @Override
191     @SuppressWarnings("checkstyle:IllegalCatch")
192     public Future<RpcResult<Void>> unregisterSingletonConstant() {
193         LOG.debug("unregister-singleton-constant");
194
195         if (getSingletonConstantRegistration == null) {
196             LOG.debug("No get-singleton-constant registration present.");
197             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
198                     "No get-singleton-constant rpc registration present.");
199             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
200             return Futures.immediateFuture(result);
201         }
202
203         try {
204             getSingletonConstantRegistration.close();
205             getSingletonConstantRegistration = null;
206
207             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
208         } catch (Exception e) {
209             LOG.debug("There was a problem closing the singleton constant service", e);
210             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
211                     "There was a problem closing get-singleton-constant");
212             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
213             return Futures.immediateFuture(result);
214         }
215     }
216
217     @Override
218     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
219         LOG.debug("publish-notifications, input: {}", input);
220
221         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
222                 input.getSeconds(), input.getNotificationsPerSecond());
223
224         publishNotificationsTasks.put(input.getId(), task);
225
226         task.start();
227
228         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
229     }
230
231     @Override
232     public Future<RpcResult<Void>> subscribeDtcl() {
233
234         if (dtclReg != null) {
235             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
236                     "There is already dataTreeChangeListener registered on id-ints list.");
237             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
238         }
239
240         idIntsListener = new IdIntsListener();
241
242         dtclReg = domDataTreeChangeService
243                 .registerDataTreeChangeListener(
244                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
245                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
246                         idIntsListener);
247
248         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
249     }
250
251     @Override
252     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
253         LOG.debug("write-transactions, input: {}", input);
254         return WriteTransactionsHandler.start(domDataBroker, input);
255     }
256
257     @Override
258     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
259         return null;
260     }
261
262     @Override
263     public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
264         return null;
265     }
266
267     @Override
268     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
269
270         LOG.debug("subscribe-ynl, input: {}", input);
271
272         if (ynlRegistrations.containsKey(input.getId())) {
273             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
274                     "There is already ynl listener registered for this id: " + input.getId());
275             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
276         }
277
278         ynlRegistrations.put(input.getId(),
279                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
280
281         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
282     }
283
284     @Override
285     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
286         LOG.debug("remove-prefix-shard, input: {}", input);
287
288         return prefixShardHandler.onRemovePrefixShard(input);
289     }
290
291     @Override
292     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
293         LOG.debug("become-prefix-leader, input: {}", input);
294
295         return prefixLeaderHandler.makeLeaderLocal(input);
296     }
297
298     @Override
299     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
300         LOG.debug("unregister-bound-constant, {}", input);
301
302         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
303                 routedRegistrations.remove(input.getContext());
304
305         if (rpcRegistration == null) {
306             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
307             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
308                     "No get-constant rpc registration present.");
309             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
310             return Futures.immediateFuture(result);
311         }
312
313         rpcRegistration.close();
314         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
315     }
316
317     @Override
318     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
319
320         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
321
322         if (input.getConstant() == null) {
323             final RpcError error = RpcResultBuilder.newError(
324                     ErrorType.RPC, "Invalid input.", "Constant value is null");
325             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
326         }
327
328         getSingletonConstantRegistration =
329                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
330
331         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
332     }
333
334     @Override
335     public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
336         return null;
337     }
338
339     @Override
340     public Future<RpcResult<Void>> unregisterConstant() {
341
342         if (globalGetConstantRegistration == null) {
343             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
344                     "No get-constant rpc registration present.");
345             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
346             return Futures.immediateFuture(result);
347         }
348
349         globalGetConstantRegistration.close();
350         globalGetConstantRegistration = null;
351
352         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
353     }
354
355     @Override
356     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
357         LOG.debug("unregister-flapping-singleton received.");
358
359         if (flappingSingletonService == null) {
360             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
361                     "No flapping-singleton registration present.");
362             final RpcResult<UnregisterFlappingSingletonOutput> result =
363                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
364             return Futures.immediateFuture(result);
365         }
366
367         final long flapCount = flappingSingletonService.setInactive();
368         flappingSingletonService = null;
369
370         final UnregisterFlappingSingletonOutput output =
371                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
372
373         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
374     }
375
376     @Override
377     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
378         return null;
379     }
380
381     @Override
382     public Future<RpcResult<Void>> subscribeDdtl() {
383
384         if (ddtlReg != null) {
385             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
386                     "There is already dataTreeChangeListener registered on id-ints list.");
387             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
388         }
389
390         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
391
392         try {
393             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
394                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
395                             ProduceTransactionsHandler.ID_INT_YID)),
396                     true, Collections.emptyList());
397         } catch (DOMDataTreeLoopException e) {
398             LOG.error("Failed to register DOMDataTreeListener.", e);
399
400         }
401
402         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
403     }
404
405     @Override
406     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
407         LOG.debug("register-bound-constant: {}", input);
408
409         if (input.getContext() == null) {
410             final RpcError error = RpcResultBuilder.newError(
411                     ErrorType.RPC, "Invalid input.", "Context value is null");
412             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
413         }
414
415         if (input.getConstant() == null) {
416             final RpcError error = RpcResultBuilder.newError(
417                     ErrorType.RPC, "Invalid input.", "Constant value is null");
418             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
419         }
420
421         if (routedRegistrations.containsKey(input.getContext())) {
422             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
423                     "There is already a rpc registered for context: " + input.getContext());
424             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
425         }
426
427         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
428                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
429                         input.getConstant(), input.getContext());
430
431         routedRegistrations.put(input.getContext(), rpcRegistration);
432         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
433     }
434
435     @Override
436     public Future<RpcResult<Void>> registerFlappingSingleton() {
437         LOG.debug("Received register-flapping-singleton.");
438
439         if (flappingSingletonService != null) {
440             final RpcError error = RpcResultBuilder.newError(
441                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
442             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
443         }
444
445         flappingSingletonService = new FlappingSingletonService(singletonService);
446
447         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
448     }
449
450     @Override
451     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
452         LOG.debug("Received unsubscribe-dtcl");
453
454         if (idIntsListener == null || dtclReg == null) {
455             final RpcError error = RpcResultBuilder.newError(
456                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
457             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
458                     .withRpcError(error).build());
459         }
460
461         try {
462             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
463         } catch (InterruptedException | ExecutionException | TimeoutException e) {
464             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
465                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
466             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
467                     .withRpcError(error).build());
468         }
469
470         dtclReg.close();
471         dtclReg = null;
472
473         if (!idIntsListener.hasTriggered()) {
474             final RpcError error = RpcResultBuilder.newError(
475                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
476                             + "any notifications.");
477             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
478                     .withRpcError(error).build());
479         }
480
481         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
482         try {
483             final Optional<NormalizedNode<?, ?>> readResult =
484                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
485
486             if (!readResult.isPresent()) {
487                 final RpcError error = RpcResultBuilder.newError(
488                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
489                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
490                         .withRpcError(error).build());
491             }
492
493             return Futures.immediateFuture(
494                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
495                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
496
497         } catch (final ReadFailedException e) {
498             final RpcError error = RpcResultBuilder.newError(
499                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
500             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
501                     .withRpcError(error).build());
502
503         }
504     }
505
506     @Override
507     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
508         LOG.debug("create-prefix-shard, input: {}", input);
509
510         return prefixShardHandler.onCreatePrefixShard(input);
511     }
512
513     @Override
514     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
515         return null;
516     }
517
518     @Override
519     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
520         LOG.debug("Received unsubscribe-ynl, input: {}", input);
521
522         if (!ynlRegistrations.containsKey(input.getId())) {
523             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
524                     "No ynl listener with this id registered.");
525             final RpcResult<UnsubscribeYnlOutput> result =
526                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
527             return Futures.immediateFuture(result);
528         }
529
530         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
531         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
532
533         reg.close();
534
535         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
536     }
537
538     @Override
539     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
540             final CheckPublishNotificationsInput input) {
541
542         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
543
544         if (task == null) {
545             return Futures.immediateFuture(RpcResultBuilder.success(
546                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
547         }
548
549         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
550                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
551
552         if (task.getLastError() != null) {
553             LOG.error("Last error for {}", task, task.getLastError());
554             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
555         }
556
557         final CheckPublishNotificationsOutput output =
558                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
559
560         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
561     }
562
563     @Override
564     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
565         LOG.debug("producer-transactions, input: {}", input);
566         return ProduceTransactionsHandler.start(domDataTreeService, input);
567     }
568
569     @Override
570     public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
571         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
572
573         final String shardName = input.getShardName();
574         if (Strings.isNullOrEmpty(shardName)) {
575             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
576                     "A valid shard name must be specified");
577             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
578         }
579
580         return shutdownShardGracefully(shardName);
581     }
582
583     @Override
584     public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
585         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
586
587         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
588
589         if (shardPrefix == null) {
590             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
591                     "A valid shard prefix must be specified");
592             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
593         }
594
595         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
596         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
597
598         return shutdownShardGracefully(cleanPrefixShardName);
599     }
600
601     private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
602         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
603         final ActorContext context = configDataStore.getActorContext();
604
605         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
606                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
607         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
608         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
609
610         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
611             @Override
612             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
613                 if (throwable != null) {
614                     shutdownShardAsk.failure(throwable);
615                 } else {
616                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
617                 }
618             }
619         }, context.getClientDispatcher());
620
621         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
622             @Override
623             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
624                 if (throwable != null) {
625                     final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
626                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
627                     rpcResult.set(failedResult);
628                 } else {
629                     // according to Patterns.gracefulStop API, we don't have to
630                     // check value of gracefulStopResult
631                     rpcResult.set(RpcResultBuilder.<Void>success().build());
632                 }
633             }
634         }, context.getClientDispatcher());
635         return rpcResult;
636     }
637
638     @Override
639     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
640
641         LOG.debug("Received register-constant rpc, input: {}", input);
642
643         if (input.getConstant() == null) {
644             final RpcError error = RpcResultBuilder.newError(
645                     ErrorType.RPC, "Invalid input.", "Constant value is null");
646             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
647         }
648
649         if (globalGetConstantRegistration != null) {
650             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
651                     "There is already a get-constant rpc registered.");
652             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
653         }
654
655         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
656         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
657     }
658
659     @Override
660     public Future<RpcResult<Void>> unregisterDefaultConstant() {
661         return null;
662     }
663
664     @Override
665     @SuppressWarnings("checkstyle:IllegalCatch")
666     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
667         LOG.debug("Received unsubscribe-ddtl.");
668
669         if (idIntsDdtl == null || ddtlReg == null) {
670             final RpcError error = RpcResultBuilder.newError(
671                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
672             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
673                     .withRpcError(error).build());
674         }
675
676         try {
677             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
678         } catch (InterruptedException | ExecutionException | TimeoutException e) {
679             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
680                     "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
681             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
682                     .withRpcError(error).build());
683         }
684
685         ddtlReg.close();
686         ddtlReg = null;
687
688         if (!idIntsDdtl.hasTriggered()) {
689             final RpcError error = RpcResultBuilder.newError(
690                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
691                             + "any notifications.");
692             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
693                     .withRpcError(error).build());
694         }
695
696         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
697         LOG.debug("Creating distributed datastore client for shard {}", shardName);
698
699         final ActorContext actorContext = configDataStore.getActorContext();
700         final Props distributedDataStoreClientProps =
701                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
702                         "Shard-" + shardName, actorContext, shardName);
703
704         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
705         final DataStoreClient distributedDataStoreClient;
706         try {
707             distributedDataStoreClient = SimpleDataStoreClientActor
708                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
709         } catch (RuntimeException e) {
710             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
711             clientActor.tell(PoisonPill.getInstance(), noSender());
712             final RpcError error = RpcResultBuilder.newError(
713                     ErrorType.APPLICATION, "Unable to create ds client for read.",
714                     "Unable to create ds client for read.");
715             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
716                     .withRpcError(error).build());
717         }
718
719         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
720         final ClientTransaction tx = localHistory.createTransaction();
721         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
722                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
723                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
724
725         tx.abort();
726         localHistory.close();
727         try {
728             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
729             if (!optional.isPresent()) {
730                 LOG.warn("Final read from client is empty.");
731                 final RpcError error = RpcResultBuilder.newError(
732                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
733                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
734                         .withRpcError(error).build());
735             }
736
737             return Futures.immediateFuture(
738                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
739                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
740
741         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
742             LOG.error("Unable to read data to verify ddtl data.", e);
743             final RpcError error = RpcResultBuilder.newError(
744                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
745             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
746                     .withRpcError(error).build());
747         } finally {
748             distributedDataStoreClient.close();
749             clientActor.tell(PoisonPill.getInstance(), noSender());
750         }
751     }
752 }