BUG 8525: Prevent NPE in test-app listeners
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import com.google.common.base.Optional;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.ActorSystemProvider;
29 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
33 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
36 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
37 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
38 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
39 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
40 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
41 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
42 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
43 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
45 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
46 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
47 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
49 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
50 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
51 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
52 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
56 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
58 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
59 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
60 import org.opendaylight.controller.sal.core.api.model.SchemaService;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
97 import org.opendaylight.yangtools.concepts.ListenerRegistration;
98 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
99 import org.opendaylight.yangtools.yang.common.RpcError;
100 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
101 import org.opendaylight.yangtools.yang.common.RpcResult;
102 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
103 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.slf4j.Logger;
106 import org.slf4j.LoggerFactory;
107
108 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
109
110     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
111     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
112             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
113
114     private final RpcProviderRegistry rpcRegistry;
115     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
116     private final DistributedShardFactory distributedShardFactory;
117     private final DistributedDataStoreInterface configDataStore;
118     private final DOMDataTreeService domDataTreeService;
119     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
120     private final DOMDataBroker domDataBroker;
121     private final NotificationPublishService notificationPublishService;
122     private final NotificationService notificationService;
123     private final SchemaService schemaService;
124     private final ClusterSingletonServiceProvider singletonService;
125     private final DOMRpcProviderService domRpcService;
126     private final PrefixLeaderHandler prefixLeaderHandler;
127     private final PrefixShardHandler prefixShardHandler;
128     private final DOMDataTreeChangeService domDataTreeChangeService;
129     private final ActorSystem actorSystem;
130
131     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
132             new HashMap<>();
133
134     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
135
136     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
137     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
138     private FlappingSingletonService flappingSingletonService;
139     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
140     private IdIntsListener idIntsListener;
141     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
142     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
143     private IdIntsDOMDataTreeLIstener idIntsDdtl;
144
145
146
147     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
148                                      final DOMRpcProviderService domRpcService,
149                                      final ClusterSingletonServiceProvider singletonService,
150                                      final SchemaService schemaService,
151                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
152                                      final NotificationPublishService notificationPublishService,
153                                      final NotificationService notificationService,
154                                      final DOMDataBroker domDataBroker,
155                                      final DOMDataTreeService domDataTreeService,
156                                      final DistributedShardFactory distributedShardFactory,
157                                      final DistributedDataStoreInterface configDataStore,
158                                      final ActorSystemProvider actorSystemProvider) {
159         this.rpcRegistry = rpcRegistry;
160         this.domRpcService = domRpcService;
161         this.singletonService = singletonService;
162         this.schemaService = schemaService;
163         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
164         this.notificationPublishService = notificationPublishService;
165         this.notificationService = notificationService;
166         this.domDataBroker = domDataBroker;
167         this.domDataTreeService = domDataTreeService;
168         this.distributedShardFactory = distributedShardFactory;
169         this.configDataStore = configDataStore;
170         this.actorSystem = actorSystemProvider.getActorSystem();
171
172         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
173
174         domDataTreeChangeService =
175                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
176
177         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
178
179         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
180                 bindingNormalizedNodeSerializer);
181     }
182
183     @Override
184     public Future<RpcResult<Void>> unregisterSingletonConstant() {
185         LOG.debug("unregister-singleton-constant");
186
187         if (getSingletonConstantRegistration == null) {
188             LOG.debug("No get-singleton-constant registration present.");
189             final RpcError rpcError = RpcResultBuilder
190                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
191             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
192             return Futures.immediateFuture(result);
193         }
194
195         try {
196             getSingletonConstantRegistration.close();
197             getSingletonConstantRegistration = null;
198
199             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
200         } catch (final Exception e) {
201             LOG.debug("There was a problem closing the singleton constant service", e);
202             final RpcError rpcError = RpcResultBuilder
203                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
204             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
205             return Futures.immediateFuture(result);
206         }
207     }
208
209     @Override
210     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
211         LOG.debug("publish-notifications, input: {}", input);
212
213         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
214                 input.getSeconds(), input.getNotificationsPerSecond());
215
216         publishNotificationsTasks.put(input.getId(), task);
217
218         task.start();
219
220         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
221     }
222
223     @Override
224     public Future<RpcResult<Void>> subscribeDtcl() {
225
226         if (dtclReg != null) {
227             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
228                     "There is already dataTreeChangeListener registered on id-ints list.");
229             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
230         }
231
232         idIntsListener = new IdIntsListener();
233
234         dtclReg = domDataTreeChangeService
235                 .registerDataTreeChangeListener(
236                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
237                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
238                         idIntsListener);
239
240         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
241     }
242
243     @Override
244     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
245         LOG.debug("write-transactions, input: {}", input);
246
247         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
248
249         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
250         writeTransactionsHandler.start(settableFuture);
251
252         return settableFuture;
253     }
254
255     @Override
256     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
257         return null;
258     }
259
260     @Override
261     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
262         return null;
263     }
264
265     @Override
266     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
267
268         LOG.debug("subscribe-ynl, input: {}", input);
269
270         if (ynlRegistrations.containsKey(input.getId())) {
271             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
272                     "There is already ynl listener registered for this id: " + input.getId());
273             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
274         }
275
276         ynlRegistrations.put(input.getId(),
277                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
278
279         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
280     }
281
282     @Override
283     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
284         LOG.debug("remove-prefix-shard, input: {}", input);
285
286         return prefixShardHandler.onRemovePrefixShard(input);
287     }
288
289     @Override
290     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
291         LOG.debug("become-prefix-leader, input: {}", input);
292
293         return prefixLeaderHandler.makeLeaderLocal(input);
294     }
295
296     @Override
297     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
298         LOG.debug("unregister-bound-constant, {}", input);
299
300         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
301                 routedRegistrations.remove(input.getContext());
302
303         if (registration == null) {
304             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
305             final RpcError rpcError = RpcResultBuilder
306                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
307             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
308             return Futures.immediateFuture(result);
309         }
310
311         registration.close();
312         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
313     }
314
315     @Override
316     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
317
318         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
319
320         if (input.getConstant() == null) {
321             final RpcError error = RpcResultBuilder.newError(
322                     ErrorType.RPC, "Invalid input.", "Constant value is null");
323             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
324         }
325
326         getSingletonConstantRegistration =
327                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
328
329         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
330     }
331
332     @Override
333     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
334         return null;
335     }
336
337     @Override
338     public Future<RpcResult<Void>> unregisterConstant() {
339
340         if (globalGetConstantRegistration == null) {
341             final RpcError rpcError = RpcResultBuilder
342                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
343             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
344             return Futures.immediateFuture(result);
345         }
346
347         globalGetConstantRegistration.close();
348         globalGetConstantRegistration = null;
349
350         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
351     }
352
353     @Override
354     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
355         LOG.debug("unregister-flapping-singleton received.");
356
357         if (flappingSingletonService == null) {
358             final RpcError rpcError = RpcResultBuilder
359                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
360             final RpcResult<UnregisterFlappingSingletonOutput> result =
361                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
362             return Futures.immediateFuture(result);
363         }
364
365         final long flapCount = flappingSingletonService.setInactive();
366         flappingSingletonService = null;
367
368         final UnregisterFlappingSingletonOutput output =
369                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
370
371         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
372     }
373
374     @Override
375     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
376         return null;
377     }
378
379     @Override
380     public Future<RpcResult<Void>> subscribeDdtl() {
381
382         if (ddtlReg != null) {
383             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
384                     "There is already dataTreeChangeListener registered on id-ints list.");
385             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
386         }
387
388         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
389
390         try {
391             ddtlReg =
392                     domDataTreeService.registerListener(idIntsDdtl,
393                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
394                                     ProduceTransactionsHandler.ID_INT_YID))
395                             , true, Collections.emptyList());
396         } catch (DOMDataTreeLoopException e) {
397             LOG.error("Failed to register DOMDataTreeListener.", e);
398
399         }
400
401         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
402     }
403
404     @Override
405     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
406         LOG.debug("register-bound-constant: {}", input);
407
408         if (input.getContext() == null) {
409             final RpcError error = RpcResultBuilder.newError(
410                     ErrorType.RPC, "Invalid input.", "Context value is null");
411             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
412         }
413
414         if (input.getConstant() == null) {
415             final RpcError error = RpcResultBuilder.newError(
416                     ErrorType.RPC, "Invalid input.", "Constant value is null");
417             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
418         }
419
420         if (routedRegistrations.containsKey(input.getContext())) {
421             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
422                     "There is already a rpc registered for context: " + input.getContext());
423             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
424         }
425
426         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
427                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
428                         input.getConstant(), input.getContext());
429
430         routedRegistrations.put(input.getContext(), registration);
431         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
432     }
433
434     @Override
435     public Future<RpcResult<Void>> registerFlappingSingleton() {
436         LOG.debug("Received register-flapping-singleton.");
437
438         if (flappingSingletonService != null) {
439             final RpcError error = RpcResultBuilder.newError(
440                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
441             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
442         }
443
444         flappingSingletonService = new FlappingSingletonService(singletonService);
445
446         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
447     }
448
449     @Override
450     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
451         LOG.debug("Received unsubscribe-dtcl");
452
453         if (idIntsListener == null || dtclReg == null) {
454             final RpcError error = RpcResultBuilder.newError(
455                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
456             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
457         }
458
459         dtclReg.close();
460         dtclReg = null;
461
462         if (!idIntsListener.hasTriggered()) {
463             final RpcError error = RpcResultBuilder.newError(
464                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
465                             "any notifications.");
466             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
467                     .withRpcError(error).build());
468         }
469
470         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
471         try {
472             final Optional<NormalizedNode<?, ?>> readResult =
473                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
474
475             if (!readResult.isPresent()) {
476                 final RpcError error = RpcResultBuilder.newError(
477                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
478                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
479                         .withRpcError(error).build());
480             }
481
482             return Futures.immediateFuture(
483                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
484                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
485
486         } catch (final ReadFailedException e) {
487             final RpcError error = RpcResultBuilder.newError(
488                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
489             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
490                     .withRpcError(error).build());
491
492         }
493     }
494
495     @Override
496     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
497         LOG.debug("create-prefix-shard, input: {}", input);
498
499         return prefixShardHandler.onCreatePrefixShard(input);
500     }
501
502     @Override
503     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
504         return null;
505     }
506
507     @Override
508     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
509         LOG.debug("Received unsubscribe-ynl, input: {}", input);
510
511         if (!ynlRegistrations.containsKey(input.getId())) {
512             final RpcError rpcError = RpcResultBuilder
513                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
514             final RpcResult<UnsubscribeYnlOutput> result =
515                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
516             return Futures.immediateFuture(result);
517         }
518
519         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
520         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
521
522         registration.close();
523
524         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
525     }
526
527     @Override
528     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
529             final CheckPublishNotificationsInput input) {
530
531         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
532
533         if (task == null) {
534             return Futures.immediateFuture(RpcResultBuilder.success(
535                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
536         }
537
538         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
539                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
540
541         if (task.getLastError() != null) {
542             final StringWriter sw = new StringWriter();
543             final PrintWriter pw = new PrintWriter(sw);
544             task.getLastError().printStackTrace(pw);
545             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
546         }
547
548         final CheckPublishNotificationsOutput output =
549                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
550
551         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
552     }
553
554     @Override
555     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
556         LOG.debug("producer-transactions, input: {}", input);
557
558         final ProduceTransactionsHandler handler =
559                 new ProduceTransactionsHandler(domDataTreeService, input);
560
561         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
562         handler.start(settableFuture);
563
564         return settableFuture;
565     }
566
567     @Override
568     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
569
570         LOG.debug("Received register-constant rpc, input: {}", input);
571
572         if (input.getConstant() == null) {
573             final RpcError error = RpcResultBuilder.newError(
574                     ErrorType.RPC, "Invalid input.", "Constant value is null");
575             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
576         }
577
578         if (globalGetConstantRegistration != null) {
579             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
580                     "There is already a get-constant rpc registered.");
581             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
582         }
583
584         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
585         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
586     }
587
588     @Override
589     public Future<RpcResult<Void>> unregisterDefaultConstant() {
590         return null;
591     }
592
593     @Override
594     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
595         LOG.debug("Received unsubscribe-ddtl.");
596
597         if (idIntsDdtl == null || ddtlReg == null) {
598             final RpcError error = RpcResultBuilder.newError(
599                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
600             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
601         }
602
603         ddtlReg.close();
604         ddtlReg = null;
605
606         if (!idIntsDdtl.hasTriggered()) {
607             final RpcError error = RpcResultBuilder.newError(
608                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
609                             "any notifications.");
610             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
611                     .withRpcError(error).build());
612         }
613
614         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
615         LOG.debug("Creating distributed datastore client for shard {}", shardName);
616
617         final ActorContext actorContext = configDataStore.getActorContext();
618         final Props distributedDataStoreClientProps =
619                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
620                         "Shard-" + shardName, actorContext, shardName);
621
622         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
623         final DataStoreClient distributedDataStoreClient;
624         try {
625             distributedDataStoreClient = SimpleDataStoreClientActor
626                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
627         } catch (final Exception e) {
628             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
629             clientActor.tell(PoisonPill.getInstance(), noSender());
630             final RpcError error = RpcResultBuilder.newError(
631                     ErrorType.APPLICATION, "Unable to create ds client for read.",
632                     "Unable to create ds client for read.");
633             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
634                     .withRpcError(error).build());
635         }
636
637         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
638         final ClientTransaction tx = localHistory.createTransaction();
639         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
640                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
641                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
642
643         tx.abort();
644         localHistory.close();
645         try {
646             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
647             if (!optional.isPresent()) {
648                 LOG.warn("Final read from client is empty.");
649                 final RpcError error = RpcResultBuilder.newError(
650                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
651                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
652                         .withRpcError(error).build());
653             }
654
655             return Futures.immediateFuture(
656                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
657                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
658
659         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
660             LOG.error("Unable to read data to verify ddtl data.", e);
661             final RpcError error = RpcResultBuilder.newError(
662                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
663             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
664                     .withRpcError(error).build());
665         } finally {
666             distributedDataStoreClient.close();
667             clientActor.tell(PoisonPill.getInstance(), noSender());
668         }
669     }
670 }