BUG 8462: Switch to using cds-client in usubscribe-ddtl
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import com.google.common.base.Optional;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.ActorSystemProvider;
29 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
33 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
36 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
37 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
38 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
39 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
40 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
41 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
42 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
43 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
45 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
46 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
47 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
49 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
50 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
51 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
52 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
56 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
58 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
59 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
60 import org.opendaylight.controller.sal.core.api.model.SchemaService;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
97 import org.opendaylight.yangtools.concepts.ListenerRegistration;
98 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
99 import org.opendaylight.yangtools.yang.common.RpcError;
100 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
101 import org.opendaylight.yangtools.yang.common.RpcResult;
102 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
103 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.slf4j.Logger;
106 import org.slf4j.LoggerFactory;
107
108 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
109
110     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
111     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
112             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
113
114     private final RpcProviderRegistry rpcRegistry;
115     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
116     private final DistributedShardFactory distributedShardFactory;
117     private final DistributedDataStoreInterface configDataStore;
118     private final DOMDataTreeService domDataTreeService;
119     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
120     private final DOMDataBroker domDataBroker;
121     private final NotificationPublishService notificationPublishService;
122     private final NotificationService notificationService;
123     private final SchemaService schemaService;
124     private final ClusterSingletonServiceProvider singletonService;
125     private final DOMRpcProviderService domRpcService;
126     private final PrefixLeaderHandler prefixLeaderHandler;
127     private final PrefixShardHandler prefixShardHandler;
128     private final DOMDataTreeChangeService domDataTreeChangeService;
129     private final ActorSystem actorSystem;
130
131     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
132             new HashMap<>();
133
134     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
135
136     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
137     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
138     private FlappingSingletonService flappingSingletonService;
139     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
140     private IdIntsListener idIntsListener;
141     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
142     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
143     private IdIntsDOMDataTreeLIstener idIntsDdtl;
144
145
146
147     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
148                                      final DOMRpcProviderService domRpcService,
149                                      final ClusterSingletonServiceProvider singletonService,
150                                      final SchemaService schemaService,
151                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
152                                      final NotificationPublishService notificationPublishService,
153                                      final NotificationService notificationService,
154                                      final DOMDataBroker domDataBroker,
155                                      final DOMDataTreeService domDataTreeService,
156                                      final DistributedShardFactory distributedShardFactory,
157                                      final DistributedDataStoreInterface configDataStore,
158                                      final ActorSystemProvider actorSystemProvider) {
159         this.rpcRegistry = rpcRegistry;
160         this.domRpcService = domRpcService;
161         this.singletonService = singletonService;
162         this.schemaService = schemaService;
163         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
164         this.notificationPublishService = notificationPublishService;
165         this.notificationService = notificationService;
166         this.domDataBroker = domDataBroker;
167         this.domDataTreeService = domDataTreeService;
168         this.distributedShardFactory = distributedShardFactory;
169         this.configDataStore = configDataStore;
170         this.actorSystem = actorSystemProvider.getActorSystem();
171
172         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
173
174         domDataTreeChangeService =
175                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
176
177         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
178
179         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
180                 bindingNormalizedNodeSerializer);
181     }
182
183     @Override
184     public Future<RpcResult<Void>> unregisterSingletonConstant() {
185         LOG.debug("unregister-singleton-constant");
186
187         if (getSingletonConstantRegistration == null) {
188             LOG.debug("No get-singleton-constant registration present.");
189             final RpcError rpcError = RpcResultBuilder
190                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
191             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
192             return Futures.immediateFuture(result);
193         }
194
195         try {
196             getSingletonConstantRegistration.close();
197             getSingletonConstantRegistration = null;
198
199             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
200         } catch (final Exception e) {
201             LOG.debug("There was a problem closing the singleton constant service", e);
202             final RpcError rpcError = RpcResultBuilder
203                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
204             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
205             return Futures.immediateFuture(result);
206         }
207     }
208
209     @Override
210     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
211         LOG.debug("publish-notifications, input: {}", input);
212
213         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
214                 input.getSeconds(), input.getNotificationsPerSecond());
215
216         publishNotificationsTasks.put(input.getId(), task);
217
218         task.start();
219
220         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
221     }
222
223     @Override
224     public Future<RpcResult<Void>> subscribeDtcl() {
225
226         if (dtclReg != null) {
227             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
228                     "There is already dataTreeChangeListener registered on id-ints list.");
229             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
230         }
231
232         idIntsListener = new IdIntsListener();
233
234         dtclReg = domDataTreeChangeService
235                 .registerDataTreeChangeListener(
236                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
237                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
238                         idIntsListener);
239
240         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
241     }
242
243     @Override
244     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
245         LOG.debug("write-transactions, input: {}", input);
246
247         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
248
249         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
250         writeTransactionsHandler.start(settableFuture);
251
252         return settableFuture;
253     }
254
255     @Override
256     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
257         return null;
258     }
259
260     @Override
261     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
262         return null;
263     }
264
265     @Override
266     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
267
268         LOG.debug("subscribe-ynl, input: {}", input);
269
270         if (ynlRegistrations.containsKey(input.getId())) {
271             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
272                     "There is already ynl listener registered for this id: " + input.getId());
273             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
274         }
275
276         ynlRegistrations.put(input.getId(),
277                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
278
279         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
280     }
281
282     @Override
283     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
284         LOG.debug("remove-prefix-shard, input: {}", input);
285
286         return prefixShardHandler.onRemovePrefixShard(input);
287     }
288
289     @Override
290     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
291         LOG.debug("become-prefix-leader, input: {}", input);
292
293         return prefixLeaderHandler.makeLeaderLocal(input);
294     }
295
296     @Override
297     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
298         LOG.debug("unregister-bound-constant, {}", input);
299
300         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
301                 routedRegistrations.remove(input.getContext());
302
303         if (registration == null) {
304             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
305             final RpcError rpcError = RpcResultBuilder
306                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
307             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
308             return Futures.immediateFuture(result);
309         }
310
311         registration.close();
312         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
313     }
314
315     @Override
316     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
317
318         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
319
320         if (input.getConstant() == null) {
321             final RpcError error = RpcResultBuilder.newError(
322                     ErrorType.RPC, "Invalid input.", "Constant value is null");
323             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
324         }
325
326         getSingletonConstantRegistration =
327                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
328
329         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
330     }
331
332     @Override
333     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
334         return null;
335     }
336
337     @Override
338     public Future<RpcResult<Void>> unregisterConstant() {
339
340         if (globalGetConstantRegistration == null) {
341             final RpcError rpcError = RpcResultBuilder
342                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
343             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
344             return Futures.immediateFuture(result);
345         }
346
347         globalGetConstantRegistration.close();
348         globalGetConstantRegistration = null;
349
350         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
351     }
352
353     @Override
354     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
355         LOG.debug("unregister-flapping-singleton received.");
356
357         if (flappingSingletonService == null) {
358             final RpcError rpcError = RpcResultBuilder
359                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
360             final RpcResult<UnregisterFlappingSingletonOutput> result =
361                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
362             return Futures.immediateFuture(result);
363         }
364
365         final long flapCount = flappingSingletonService.setInactive();
366         flappingSingletonService = null;
367
368         final UnregisterFlappingSingletonOutput output =
369                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
370
371         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
372     }
373
374     @Override
375     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
376         return null;
377     }
378
379     @Override
380     public Future<RpcResult<Void>> subscribeDdtl() {
381
382         if (ddtlReg != null) {
383             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
384                     "There is already dataTreeChangeListener registered on id-ints list.");
385             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
386         }
387
388         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
389
390         try {
391             ddtlReg =
392                     domDataTreeService.registerListener(idIntsDdtl,
393                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
394                                     ProduceTransactionsHandler.ID_INT_YID))
395                             , true, Collections.emptyList());
396         } catch (DOMDataTreeLoopException e) {
397             LOG.error("Failed to register DOMDataTreeListener.", e);
398
399         }
400
401         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
402     }
403
404     @Override
405     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
406         LOG.debug("register-bound-constant: {}", input);
407
408         if (input.getContext() == null) {
409             final RpcError error = RpcResultBuilder.newError(
410                     ErrorType.RPC, "Invalid input.", "Context value is null");
411             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
412         }
413
414         if (input.getConstant() == null) {
415             final RpcError error = RpcResultBuilder.newError(
416                     ErrorType.RPC, "Invalid input.", "Constant value is null");
417             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
418         }
419
420         if (routedRegistrations.containsKey(input.getContext())) {
421             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
422                     "There is already a rpc registered for context: " + input.getContext());
423             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
424         }
425
426         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
427                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
428                         input.getConstant(), input.getContext());
429
430         routedRegistrations.put(input.getContext(), registration);
431         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
432     }
433
434     @Override
435     public Future<RpcResult<Void>> registerFlappingSingleton() {
436         LOG.debug("Received register-flapping-singleton.");
437
438         if (flappingSingletonService != null) {
439             final RpcError error = RpcResultBuilder.newError(
440                     ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
441             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
442         }
443
444         flappingSingletonService = new FlappingSingletonService(singletonService);
445
446         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
447     }
448
449     @Override
450     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
451         LOG.debug("Received unsubscribe-dtcl");
452
453         if (idIntsListener == null || dtclReg == null) {
454             final RpcError error = RpcResultBuilder.newError(
455                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
456             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
457         }
458
459         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
460         try {
461             if (dtclReg != null) {
462                 dtclReg.close();
463                 dtclReg = null;
464             }
465
466
467
468             final Optional<NormalizedNode<?, ?>> readResult =
469                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
470
471             if (!readResult.isPresent()) {
472                 final RpcError error = RpcResultBuilder.newError(
473                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
474                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
475                         .withRpcError(error).build());
476             }
477
478             return Futures.immediateFuture(
479                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
480                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
481
482         } catch (final ReadFailedException e) {
483             final RpcError error = RpcResultBuilder.newError(
484                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
485             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
486                     .withRpcError(error).build());
487
488         }
489     }
490
491     @Override
492     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
493         LOG.debug("create-prefix-shard, input: {}", input);
494
495         return prefixShardHandler.onCreatePrefixShard(input);
496     }
497
498     @Override
499     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
500         return null;
501     }
502
503     @Override
504     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
505         LOG.debug("Received unsubscribe-ynl, input: {}", input);
506
507         if (!ynlRegistrations.containsKey(input.getId())) {
508             final RpcError rpcError = RpcResultBuilder
509                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
510             final RpcResult<UnsubscribeYnlOutput> result =
511                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
512             return Futures.immediateFuture(result);
513         }
514
515         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
516         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
517
518         registration.close();
519
520         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
521     }
522
523     @Override
524     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
525             final CheckPublishNotificationsInput input) {
526
527         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
528
529         if (task == null) {
530             return Futures.immediateFuture(RpcResultBuilder.success(
531                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
532         }
533
534         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
535                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
536
537         if (task.getLastError() != null) {
538             final StringWriter sw = new StringWriter();
539             final PrintWriter pw = new PrintWriter(sw);
540             task.getLastError().printStackTrace(pw);
541             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
542         }
543
544         final CheckPublishNotificationsOutput output =
545                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
546
547         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
548     }
549
550     @Override
551     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
552         LOG.debug("producer-transactions, input: {}", input);
553
554         final ProduceTransactionsHandler handler =
555                 new ProduceTransactionsHandler(domDataTreeService, input);
556
557         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
558         handler.start(settableFuture);
559
560         return settableFuture;
561     }
562
563     @Override
564     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
565
566         LOG.debug("Received register-constant rpc, input: {}", input);
567
568         if (input.getConstant() == null) {
569             final RpcError error = RpcResultBuilder.newError(
570                     ErrorType.RPC, "Invalid input.", "Constant value is null");
571             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
572         }
573
574         if (globalGetConstantRegistration != null) {
575             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
576                     "There is already a get-constant rpc registered.");
577             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
578         }
579
580         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
581         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
582     }
583
584     @Override
585     public Future<RpcResult<Void>> unregisterDefaultConstant() {
586         return null;
587     }
588
589     @Override
590     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
591         LOG.debug("Received unsubscribe-ddtl.");
592
593         if (idIntsDdtl == null || ddtlReg == null) {
594             final RpcError error = RpcResultBuilder.newError(
595                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
596             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
597         }
598
599         ddtlReg.close();
600         ddtlReg = null;
601
602         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
603         LOG.debug("Creating distributed datastore client for shard {}", shardName);
604
605         final ActorContext actorContext = configDataStore.getActorContext();
606         final Props distributedDataStoreClientProps =
607                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
608                         "Shard-" + shardName, actorContext, shardName);
609
610         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
611         final DataStoreClient distributedDataStoreClient;
612         try {
613             distributedDataStoreClient = SimpleDataStoreClientActor
614                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
615         } catch (final Exception e) {
616             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
617             clientActor.tell(PoisonPill.getInstance(), noSender());
618             final RpcError error = RpcResultBuilder.newError(
619                     ErrorType.APPLICATION, "Unable to create ds client for read.",
620                     "Unable to create ds client for read.");
621             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
622                     .withRpcError(error).build());
623         }
624
625         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
626         final ClientTransaction tx = localHistory.createTransaction();
627         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
628                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
629                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
630
631         tx.abort();
632         localHistory.close();
633         try {
634             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
635             if (!optional.isPresent()) {
636                 LOG.warn("Final read from client is empty.");
637                 final RpcError error = RpcResultBuilder.newError(
638                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
639                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
640                         .withRpcError(error).build());
641             }
642
643             return Futures.immediateFuture(
644                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
645                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
646
647         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
648             LOG.error("Unable to read data to verify ddtl data.", e);
649             final RpcError error = RpcResultBuilder.newError(
650                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
651             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
652                     .withRpcError(error).build());
653         } finally {
654             distributedDataStoreClient.close();
655             clientActor.tell(PoisonPill.getInstance(), noSender());
656         }
657     }
658 }