BUG 8629: Try to allow notification processing to finish in unsubscribe of listeners.
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.controller.cluster.ActorSystemProvider;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
38 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
42 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
43 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
44 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
45 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
46 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
47 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
49 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
51 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
52 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
53 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
54 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
55 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
56 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
57 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
62 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
63 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
64 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
65 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
66 import org.opendaylight.controller.sal.core.api.model.SchemaService;
67 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
68 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
72 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
73 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
105 import org.opendaylight.yangtools.concepts.ListenerRegistration;
106 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
109 import org.opendaylight.yangtools.yang.common.RpcResult;
110 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
111 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.slf4j.Logger;
114 import org.slf4j.LoggerFactory;
115 import scala.concurrent.duration.FiniteDuration;
116
117 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
118
119     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
120     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
121             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
122
123     private final RpcProviderRegistry rpcRegistry;
124     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
125     private final DistributedShardFactory distributedShardFactory;
126     private final DistributedDataStoreInterface configDataStore;
127     private final DOMDataTreeService domDataTreeService;
128     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
129     private final DOMDataBroker domDataBroker;
130     private final NotificationPublishService notificationPublishService;
131     private final NotificationService notificationService;
132     private final SchemaService schemaService;
133     private final ClusterSingletonServiceProvider singletonService;
134     private final DOMRpcProviderService domRpcService;
135     private final PrefixLeaderHandler prefixLeaderHandler;
136     private final PrefixShardHandler prefixShardHandler;
137     private final DOMDataTreeChangeService domDataTreeChangeService;
138     private final ActorSystem actorSystem;
139
140     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
141             new HashMap<>();
142
143     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
144
145     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
146     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
147     private FlappingSingletonService flappingSingletonService;
148     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
149     private IdIntsListener idIntsListener;
150     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
151     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
152     private IdIntsDOMDataTreeLIstener idIntsDdtl;
153
154
155
156     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
157                                      final DOMRpcProviderService domRpcService,
158                                      final ClusterSingletonServiceProvider singletonService,
159                                      final SchemaService schemaService,
160                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
161                                      final NotificationPublishService notificationPublishService,
162                                      final NotificationService notificationService,
163                                      final DOMDataBroker domDataBroker,
164                                      final DOMDataTreeService domDataTreeService,
165                                      final DistributedShardFactory distributedShardFactory,
166                                      final DistributedDataStoreInterface configDataStore,
167                                      final ActorSystemProvider actorSystemProvider) {
168         this.rpcRegistry = rpcRegistry;
169         this.domRpcService = domRpcService;
170         this.singletonService = singletonService;
171         this.schemaService = schemaService;
172         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
173         this.notificationPublishService = notificationPublishService;
174         this.notificationService = notificationService;
175         this.domDataBroker = domDataBroker;
176         this.domDataTreeService = domDataTreeService;
177         this.distributedShardFactory = distributedShardFactory;
178         this.configDataStore = configDataStore;
179         this.actorSystem = actorSystemProvider.getActorSystem();
180
181         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
182
183         domDataTreeChangeService =
184                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
185
186         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
187
188         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
189                 bindingNormalizedNodeSerializer);
190     }
191
192     @Override
193     public Future<RpcResult<Void>> unregisterSingletonConstant() {
194         LOG.debug("unregister-singleton-constant");
195
196         if (getSingletonConstantRegistration == null) {
197             LOG.debug("No get-singleton-constant registration present.");
198             final RpcError rpcError = RpcResultBuilder
199                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
200             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
201             return Futures.immediateFuture(result);
202         }
203
204         try {
205             getSingletonConstantRegistration.close();
206             getSingletonConstantRegistration = null;
207
208             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
209         } catch (final Exception e) {
210             LOG.debug("There was a problem closing the singleton constant service", e);
211             final RpcError rpcError = RpcResultBuilder
212                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
213             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
214             return Futures.immediateFuture(result);
215         }
216     }
217
218     @Override
219     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
220         LOG.debug("publish-notifications, input: {}", input);
221
222         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
223                 input.getSeconds(), input.getNotificationsPerSecond());
224
225         publishNotificationsTasks.put(input.getId(), task);
226
227         task.start();
228
229         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
230     }
231
232     @Override
233     public Future<RpcResult<Void>> subscribeDtcl() {
234
235         if (dtclReg != null) {
236             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
237                     "There is already dataTreeChangeListener registered on id-ints list.");
238             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
239         }
240
241         idIntsListener = new IdIntsListener();
242
243         dtclReg = domDataTreeChangeService
244                 .registerDataTreeChangeListener(
245                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
246                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
247                         idIntsListener);
248
249         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
250     }
251
252     @Override
253     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
254         LOG.debug("write-transactions, input: {}", input);
255
256         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
257
258         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
259         writeTransactionsHandler.start(settableFuture);
260
261         return settableFuture;
262     }
263
264     @Override
265     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
266         return null;
267     }
268
269     @Override
270     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
271         return null;
272     }
273
274     @Override
275     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
276
277         LOG.debug("subscribe-ynl, input: {}", input);
278
279         if (ynlRegistrations.containsKey(input.getId())) {
280             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
281                     "There is already ynl listener registered for this id: " + input.getId());
282             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
283         }
284
285         ynlRegistrations.put(input.getId(),
286                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
287
288         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
289     }
290
291     @Override
292     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
293         LOG.debug("remove-prefix-shard, input: {}", input);
294
295         return prefixShardHandler.onRemovePrefixShard(input);
296     }
297
298     @Override
299     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
300         LOG.debug("become-prefix-leader, input: {}", input);
301
302         return prefixLeaderHandler.makeLeaderLocal(input);
303     }
304
305     @Override
306     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
307         LOG.debug("unregister-bound-constant, {}", input);
308
309         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
310                 routedRegistrations.remove(input.getContext());
311
312         if (registration == null) {
313             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
314             final RpcError rpcError = RpcResultBuilder
315                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
316             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
317             return Futures.immediateFuture(result);
318         }
319
320         registration.close();
321         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
322     }
323
324     @Override
325     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
326
327         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
328
329         if (input.getConstant() == null) {
330             final RpcError error = RpcResultBuilder.newError(
331                     ErrorType.RPC, "Invalid input.", "Constant value is null");
332             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
333         }
334
335         getSingletonConstantRegistration =
336                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
337
338         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
339     }
340
341     @Override
342     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
343         return null;
344     }
345
346     @Override
347     public Future<RpcResult<Void>> unregisterConstant() {
348
349         if (globalGetConstantRegistration == null) {
350             final RpcError rpcError = RpcResultBuilder
351                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
352             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
353             return Futures.immediateFuture(result);
354         }
355
356         globalGetConstantRegistration.close();
357         globalGetConstantRegistration = null;
358
359         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
360     }
361
362     @Override
363     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
364         LOG.debug("unregister-flapping-singleton received.");
365
366         if (flappingSingletonService == null) {
367             final RpcError rpcError = RpcResultBuilder
368                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
369             final RpcResult<UnregisterFlappingSingletonOutput> result =
370                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
371             return Futures.immediateFuture(result);
372         }
373
374         final long flapCount = flappingSingletonService.setInactive();
375         flappingSingletonService = null;
376
377         final UnregisterFlappingSingletonOutput output =
378                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
379
380         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
381     }
382
383     @Override
384     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
385         return null;
386     }
387
388     @Override
389     public Future<RpcResult<Void>> subscribeDdtl() {
390
391         if (ddtlReg != null) {
392             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
393                     "There is already dataTreeChangeListener registered on id-ints list.");
394             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
395         }
396
397         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
398
399         try {
400             ddtlReg =
401                     domDataTreeService.registerListener(idIntsDdtl,
402                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
403                                     ProduceTransactionsHandler.ID_INT_YID))
404                             , true, Collections.emptyList());
405         } catch (DOMDataTreeLoopException e) {
406             LOG.error("Failed to register DOMDataTreeListener.", e);
407
408         }
409
410         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
411     }
412
413     @Override
414     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
415         LOG.debug("register-bound-constant: {}", input);
416
417         if (input.getContext() == null) {
418             final RpcError error = RpcResultBuilder.newError(
419                     ErrorType.RPC, "Invalid input.", "Context value is null");
420             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
421         }
422
423         if (input.getConstant() == null) {
424             final RpcError error = RpcResultBuilder.newError(
425                     ErrorType.RPC, "Invalid input.", "Constant value is null");
426             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
427         }
428
429         if (routedRegistrations.containsKey(input.getContext())) {
430             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
431                     "There is already a rpc registered for context: " + input.getContext());
432             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
433         }
434
435         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
436                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
437                         input.getConstant(), input.getContext());
438
439         routedRegistrations.put(input.getContext(), registration);
440         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
441     }
442
443     @Override
444     public Future<RpcResult<Void>> registerFlappingSingleton() {
445         LOG.debug("Received register-flapping-singleton.");
446
447         if (flappingSingletonService != null) {
448             final RpcError error = RpcResultBuilder.newError(
449                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
450             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
451         }
452
453         flappingSingletonService = new FlappingSingletonService(singletonService);
454
455         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
456     }
457
458     @Override
459     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
460         LOG.debug("Received unsubscribe-dtcl");
461
462         if (idIntsListener == null || dtclReg == null) {
463             final RpcError error = RpcResultBuilder.newError(
464                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
465             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
466         }
467
468         try {
469             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
470         } catch (InterruptedException | ExecutionException | TimeoutException e) {
471             final RpcError error = RpcResultBuilder.newError(
472                     ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
473                     "clustering-it", "clustering-it", e);
474             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
475                     .withRpcError(error).build());
476         }
477
478         dtclReg.close();
479         dtclReg = null;
480
481         if (!idIntsListener.hasTriggered()) {
482             final RpcError error = RpcResultBuilder.newError(
483                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
484                             "any notifications.");
485             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
486                     .withRpcError(error).build());
487         }
488
489         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
490         try {
491             final Optional<NormalizedNode<?, ?>> readResult =
492                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
493
494             if (!readResult.isPresent()) {
495                 final RpcError error = RpcResultBuilder.newError(
496                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
497                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
498                         .withRpcError(error).build());
499             }
500
501             return Futures.immediateFuture(
502                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
503                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
504
505         } catch (final ReadFailedException e) {
506             final RpcError error = RpcResultBuilder.newError(
507                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
508             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
509                     .withRpcError(error).build());
510
511         }
512     }
513
514     @Override
515     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
516         LOG.debug("create-prefix-shard, input: {}", input);
517
518         return prefixShardHandler.onCreatePrefixShard(input);
519     }
520
521     @Override
522     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
523         return null;
524     }
525
526     @Override
527     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
528         LOG.debug("Received unsubscribe-ynl, input: {}", input);
529
530         if (!ynlRegistrations.containsKey(input.getId())) {
531             final RpcError rpcError = RpcResultBuilder
532                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
533             final RpcResult<UnsubscribeYnlOutput> result =
534                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
535             return Futures.immediateFuture(result);
536         }
537
538         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
539         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
540
541         registration.close();
542
543         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
544     }
545
546     @Override
547     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
548             final CheckPublishNotificationsInput input) {
549
550         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
551
552         if (task == null) {
553             return Futures.immediateFuture(RpcResultBuilder.success(
554                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
555         }
556
557         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
558                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
559
560         if (task.getLastError() != null) {
561             final StringWriter sw = new StringWriter();
562             final PrintWriter pw = new PrintWriter(sw);
563             task.getLastError().printStackTrace(pw);
564             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
565         }
566
567         final CheckPublishNotificationsOutput output =
568                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
569
570         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
571     }
572
573     @Override
574     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
575         LOG.debug("producer-transactions, input: {}", input);
576
577         final ProduceTransactionsHandler handler =
578                 new ProduceTransactionsHandler(domDataTreeService, input);
579
580         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
581         handler.start(settableFuture);
582
583         return settableFuture;
584     }
585
586     @Override
587     public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
588         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
589
590         final String shardName = input.getShardName();
591         if (Strings.isNullOrEmpty(shardName)) {
592             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
593                     "A valid shard name must be specified");
594             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
595         }
596
597         return shutdownShardGracefully(shardName);
598     }
599
600     @Override
601     public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
602         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
603
604         final InstanceIdentifier shardPrefix = input.getPrefix();
605
606         if (shardPrefix == null) {
607             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
608                     "A valid shard prefix must be specified");
609             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
610         }
611
612         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
613         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
614
615         return shutdownShardGracefully(cleanPrefixShardName);
616     }
617
618     private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
619         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
620         final ActorContext context = configDataStore.getActorContext();
621
622         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
623                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
624         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
625         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
626
627         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
628             @Override
629             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
630                 if (throwable != null) {
631                     shutdownShardAsk.failure(throwable);
632                 } else {
633                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
634                 }
635             }
636         }, context.getClientDispatcher());
637
638         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
639             @Override
640             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
641                 if (throwable != null) {
642                     final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
643                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
644                     rpcResult.set(failedResult);
645                 } else {
646                     // according to Patterns.gracefulStop API, we don't have to
647                     // check value of gracefulStopResult
648                     rpcResult.set(RpcResultBuilder.<Void>success().build());
649                 }
650             }
651         }, context.getClientDispatcher());
652         return rpcResult;
653     }
654
655     @Override
656     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
657
658         LOG.debug("Received register-constant rpc, input: {}", input);
659
660         if (input.getConstant() == null) {
661             final RpcError error = RpcResultBuilder.newError(
662                     ErrorType.RPC, "Invalid input.", "Constant value is null");
663             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
664         }
665
666         if (globalGetConstantRegistration != null) {
667             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
668                     "There is already a get-constant rpc registered.");
669             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
670         }
671
672         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
673         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
674     }
675
676     @Override
677     public Future<RpcResult<Void>> unregisterDefaultConstant() {
678         return null;
679     }
680
681     @Override
682     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
683         LOG.debug("Received unsubscribe-ddtl.");
684
685         if (idIntsDdtl == null || ddtlReg == null) {
686             final RpcError error = RpcResultBuilder.newError(
687                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
688             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
689         }
690
691         try {
692             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
693         } catch (InterruptedException | ExecutionException | TimeoutException e) {
694             final RpcError error = RpcResultBuilder.newError(
695                     ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
696                     "clustering-it", "clustering-it", e);
697             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
698                     .withRpcError(error).build());
699         }
700
701         ddtlReg.close();
702         ddtlReg = null;
703
704         if (!idIntsDdtl.hasTriggered()) {
705             final RpcError error = RpcResultBuilder.newError(
706                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
707                             "any notifications.");
708             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
709                     .withRpcError(error).build());
710         }
711
712         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
713         LOG.debug("Creating distributed datastore client for shard {}", shardName);
714
715         final ActorContext actorContext = configDataStore.getActorContext();
716         final Props distributedDataStoreClientProps =
717                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
718                         "Shard-" + shardName, actorContext, shardName);
719
720         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
721         final DataStoreClient distributedDataStoreClient;
722         try {
723             distributedDataStoreClient = SimpleDataStoreClientActor
724                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
725         } catch (final Exception e) {
726             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
727             clientActor.tell(PoisonPill.getInstance(), noSender());
728             final RpcError error = RpcResultBuilder.newError(
729                     ErrorType.APPLICATION, "Unable to create ds client for read.",
730                     "Unable to create ds client for read.");
731             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
732                     .withRpcError(error).build());
733         }
734
735         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
736         final ClientTransaction tx = localHistory.createTransaction();
737         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
738                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
739                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
740
741         tx.abort();
742         localHistory.close();
743         try {
744             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
745             if (!optional.isPresent()) {
746                 LOG.warn("Final read from client is empty.");
747                 final RpcError error = RpcResultBuilder.newError(
748                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
749                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
750                         .withRpcError(error).build());
751             }
752
753             return Futures.immediateFuture(
754                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
755                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
756
757         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
758             LOG.error("Unable to read data to verify ddtl data.", e);
759             final RpcError error = RpcResultBuilder.newError(
760                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
761             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
762                     .withRpcError(error).build());
763         } finally {
764             distributedDataStoreClient.close();
765             clientActor.tell(PoisonPill.getInstance(), noSender());
766         }
767     }
768 }