BUG-8733: refactor IdInts listeners
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12 import static org.opendaylight.yangtools.yang.common.RpcResultBuilder.newError;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Strings;
22 import com.google.common.util.concurrent.CheckedFuture;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.SettableFuture;
26 import java.io.PrintWriter;
27 import java.io.StringWriter;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.Map;
31 import java.util.Objects;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import org.opendaylight.controller.cluster.ActorSystemProvider;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
38 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
42 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
43 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
44 import org.opendaylight.controller.clustering.it.provider.impl.DataListenerState;
45 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
46 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
47 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
48 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
49 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
51 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
53 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
54 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
55 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
56 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
57 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
58 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
59 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
62 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
63 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
64 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
65 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
66 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
67 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
68 import org.opendaylight.controller.sal.core.api.model.SchemaService;
69 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
74 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
75 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
107 import org.opendaylight.yangtools.concepts.ListenerRegistration;
108 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
109 import org.opendaylight.yangtools.yang.common.RpcError;
110 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
111 import org.opendaylight.yangtools.yang.common.RpcResult;
112 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
113 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
114 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
115 import org.slf4j.Logger;
116 import org.slf4j.LoggerFactory;
117 import scala.concurrent.duration.FiniteDuration;
118
119 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
120
121     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
122     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
123             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
124
125     private static final ListenableFuture<RpcResult<Void>> VOID_SUCCESS = success(null);
126
127     private final RpcProviderRegistry rpcRegistry;
128     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
129     private final DistributedShardFactory distributedShardFactory;
130     private final DistributedDataStoreInterface configDataStore;
131     private final DOMDataTreeService domDataTreeService;
132     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
133     private final DOMDataBroker domDataBroker;
134     private final NotificationPublishService notificationPublishService;
135     private final NotificationService notificationService;
136     private final SchemaService schemaService;
137     private final ClusterSingletonServiceProvider singletonService;
138     private final DOMRpcProviderService domRpcService;
139     private final PrefixLeaderHandler prefixLeaderHandler;
140     private final PrefixShardHandler prefixShardHandler;
141     private final DOMDataTreeChangeService domDataTreeChangeService;
142     private final ActorSystem actorSystem;
143
144     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
145             new HashMap<>();
146
147     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
148
149     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
150     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
151     private FlappingSingletonService flappingSingletonService;
152     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
153     private IdIntsListener idIntsListener;
154     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
155     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
156     private IdIntsDOMDataTreeLIstener idIntsDdtl;
157
158
159
160     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
161                                      final DOMRpcProviderService domRpcService,
162                                      final ClusterSingletonServiceProvider singletonService,
163                                      final SchemaService schemaService,
164                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
165                                      final NotificationPublishService notificationPublishService,
166                                      final NotificationService notificationService,
167                                      final DOMDataBroker domDataBroker,
168                                      final DOMDataTreeService domDataTreeService,
169                                      final DistributedShardFactory distributedShardFactory,
170                                      final DistributedDataStoreInterface configDataStore,
171                                      final ActorSystemProvider actorSystemProvider) {
172         this.rpcRegistry = rpcRegistry;
173         this.domRpcService = domRpcService;
174         this.singletonService = singletonService;
175         this.schemaService = schemaService;
176         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
177         this.notificationPublishService = notificationPublishService;
178         this.notificationService = notificationService;
179         this.domDataBroker = domDataBroker;
180         this.domDataTreeService = domDataTreeService;
181         this.distributedShardFactory = distributedShardFactory;
182         this.configDataStore = configDataStore;
183         this.actorSystem = actorSystemProvider.getActorSystem();
184
185         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
186
187         domDataTreeChangeService =
188                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
189
190         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
191
192         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
193                 bindingNormalizedNodeSerializer);
194     }
195
196     @Override
197     public Future<RpcResult<Void>> unregisterSingletonConstant() {
198         LOG.debug("unregister-singleton-constant");
199
200         if (getSingletonConstantRegistration == null) {
201             LOG.debug("No get-singleton-constant registration present.");
202             return failure(newError(ErrorType.APPLICATION, "missing-registration",
203                 "No get-singleton-constant rpc registration present."));
204         }
205
206         try {
207             getSingletonConstantRegistration.close();
208         } catch (final Exception e) {
209             LOG.debug("There was a problem closing the singleton constant service", e);
210             return failure(newError(ErrorType.APPLICATION, "error-closing",
211                 "There was a problem closing get-singleton-constant"));
212         } finally {
213             getSingletonConstantRegistration = null;
214         }
215
216         return VOID_SUCCESS;
217     }
218
219     @Override
220     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
221         LOG.debug("publish-notifications, input: {}", input);
222
223         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
224                 input.getSeconds(), input.getNotificationsPerSecond());
225
226         publishNotificationsTasks.put(input.getId(), task);
227
228         task.start();
229
230         return VOID_SUCCESS;
231     }
232
233     @Override
234     public Future<RpcResult<Void>> subscribeDtcl() {
235         if (dtclReg != null) {
236             return failure(newError(ErrorType.RPC, "Registration present.",
237                     "There is already dataTreeChangeListener registered on id-ints list."));
238         }
239
240         idIntsListener = new IdIntsListener();
241
242         dtclReg = domDataTreeChangeService
243                 .registerDataTreeChangeListener(
244                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
245                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
246                         idIntsListener);
247
248         LOG.debug("ClusteredDOMDataTreeChangeListener registered");
249         return VOID_SUCCESS;
250     }
251
252     @Override
253     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
254         LOG.debug("write-transactions, input: {}", input);
255         return WriteTransactionsHandler.start(domDataBroker, input);
256     }
257
258     @Override
259     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
260         // FIXME: implement this
261         return null;
262     }
263
264     @Override
265     public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
266         // FIXME: implement this
267         return null;
268     }
269
270     @Override
271     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
272         LOG.debug("subscribe-ynl, input: {}", input);
273
274         if (ynlRegistrations.containsKey(input.getId())) {
275             return failure(newError(ErrorType.RPC, "Registration present.",
276                     "There is already ynl listener registered for this id: " + input.getId()));
277         }
278
279         ynlRegistrations.put(input.getId(),
280                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
281
282         return success(null);
283     }
284
285     @Override
286     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
287         LOG.debug("remove-prefix-shard, input: {}", input);
288
289         return prefixShardHandler.onRemovePrefixShard(input);
290     }
291
292     @Override
293     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
294         LOG.debug("become-prefix-leader, input: {}", input);
295
296         return prefixLeaderHandler.makeLeaderLocal(input);
297     }
298
299     @Override
300     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
301         LOG.debug("unregister-bound-constant, {}", input);
302
303         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
304                 routedRegistrations.remove(input.getContext());
305
306         if (registration == null) {
307             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
308             return failure(newError(ErrorType.APPLICATION, "missing-registration",
309                 "No get-constant rpc registration present."));
310         }
311
312         registration.close();
313         return VOID_SUCCESS;
314     }
315
316     @Override
317     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
318         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
319
320         if (input.getConstant() == null) {
321             return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
322         }
323
324         getSingletonConstantRegistration =
325                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
326
327         return VOID_SUCCESS;
328     }
329
330     @Override
331     public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
332         // FIXME: implement this
333         return null;
334     }
335
336     @Override
337     public Future<RpcResult<Void>> unregisterConstant() {
338
339         if (globalGetConstantRegistration == null) {
340             return failure(newError(ErrorType.APPLICATION, "missing-registration",
341                 "No get-constant rpc registration present."));
342         }
343
344         globalGetConstantRegistration.close();
345         globalGetConstantRegistration = null;
346
347         return VOID_SUCCESS;
348     }
349
350     @Override
351     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
352         LOG.debug("unregister-flapping-singleton received.");
353
354         if (flappingSingletonService == null) {
355             return failure(newError(ErrorType.APPLICATION, "missing-registration",
356                 "No flapping-singleton registration present."));
357         }
358
359         final long flapCount = flappingSingletonService.setInactive();
360         flappingSingletonService = null;
361
362         return success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build());
363     }
364
365     @Override
366     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
367         // FIXME: implement this
368         return null;
369     }
370
371     @Override
372     public Future<RpcResult<Void>> subscribeDdtl() {
373         if (ddtlReg != null) {
374             return failure(newError(ErrorType.RPC, "Registration present.",
375                     "There is already dataTreeChangeListener registered on id-ints list."));
376         }
377
378         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
379
380         try {
381             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
382                 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
383                     ProduceTransactionsHandler.ID_INT_YID)), true, Collections.emptyList());
384         } catch (DOMDataTreeLoopException e) {
385             LOG.error("Failed to register DOMDataTreeListener.", e);
386             return failure(newError(ErrorType.APPLICATION, "register-failed", e.getMessage()));
387         }
388
389         LOG.debug("DOMDataTreeListener registered");
390         return VOID_SUCCESS;
391     }
392
393     @Override
394     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
395         LOG.debug("register-bound-constant: {}", input);
396
397         if (input.getContext() == null) {
398             return failure(newError(ErrorType.RPC, "Invalid input.", "Context value is null"));
399         }
400
401         if (input.getConstant() == null) {
402             return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
403         }
404
405         if (routedRegistrations.containsKey(input.getContext())) {
406             return failure(newError(ErrorType.RPC, "Registration present.",
407                     "There is already a rpc registered for context: " + input.getContext()));
408         }
409
410         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
411                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
412                         input.getConstant(), input.getContext());
413
414         routedRegistrations.put(input.getContext(), registration);
415         return VOID_SUCCESS;
416     }
417
418     @Override
419     public Future<RpcResult<Void>> registerFlappingSingleton() {
420         LOG.debug("Received register-flapping-singleton.");
421
422         if (flappingSingletonService != null) {
423             return failure(RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
424                 "flapping-singleton already registered"));
425         }
426
427         flappingSingletonService = new FlappingSingletonService(singletonService);
428         return VOID_SUCCESS;
429     }
430
431     @Override
432     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
433         LOG.debug("Received unsubscribe-dtcl");
434
435         if (idIntsListener == null || dtclReg == null) {
436             return failure(newError(ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered."));
437         }
438
439         final ListenableFuture<DataListenerState> future = idIntsListener.tryFinishProcessing(dtclReg);
440         dtclReg = null;
441
442         return Futures.withFallback(Futures.transform(future, this::dtclOutput),
443             t -> failure(newError(ErrorType.RPC, "resource-denied-transport", "Failed to finish processing",
444                 "clustering-it", "clustering-it", t)));
445     }
446
447     private RpcResult<UnsubscribeDtclOutput> dtclOutput(final DataListenerState state) {
448         if (state.changeCount() == 0) {
449             return failed(newError(ErrorType.APPLICATION, "No notification received.",
450                 "id-ints listener has not received any notifications."));
451         }
452
453         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
454         final Optional<NormalizedNode<?, ?>> readResult;
455
456         try {
457             readResult = rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
458         } catch (final ReadFailedException e) {
459             return failed(newError(ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."));
460         } finally {
461             rTx.close();
462         }
463
464         final NormalizedNode<?, ?> expected = state.lastData().orNull();
465         final NormalizedNode<?, ?> actual = readResult.orNull();
466         final boolean equal = Objects.equals(expected, actual);
467         if (!equal) {
468             LOG.debug("Expected result {} read resulted in {}", expected, actual);
469         }
470
471         final RpcResultBuilder<UnsubscribeDtclOutput> b = RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
472             .setCopyMatches(equal).build());
473
474 //        for (DataListenerViolation violation : state.violations()) {
475 //            final Optional<NormalizedNodeDiff> diff = violation.toDiff();
476 //            if (diff.isPresent()) {
477 //                b.withWarning(ErrorType.APPLICATION, "Sequence mismatch", diff.get().toString());
478 //            }
479 //        }
480
481         return b.build();
482     }
483
484     @Override
485     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
486         LOG.debug("create-prefix-shard, input: {}", input);
487
488         return prefixShardHandler.onCreatePrefixShard(input);
489     }
490
491     @Override
492     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
493         // FIXME: implement this
494         return null;
495     }
496
497     @Override
498     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
499         LOG.debug("Received unsubscribe-ynl, input: {}", input);
500
501         if (!ynlRegistrations.containsKey(input.getId())) {
502             return failure(newError(ErrorType.APPLICATION, "missing-registration",
503                 "No ynl listener with this id registered."));
504         }
505
506         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
507         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
508
509         registration.close();
510
511         return success(output);
512     }
513
514     @Override
515     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
516             final CheckPublishNotificationsInput input) {
517
518         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
519         if (task == null) {
520             return success(new CheckPublishNotificationsOutputBuilder().setActive(Boolean.FALSE).build());
521         }
522
523         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
524                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
525
526         if (task.getLastError() != null) {
527             final StringWriter sw = new StringWriter();
528             final PrintWriter pw = new PrintWriter(sw);
529             task.getLastError().printStackTrace(pw);
530             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
531         }
532
533         return success(checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build());
534     }
535
536     @Override
537     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
538         LOG.debug("producer-transactions, input: {}", input);
539         return ProduceTransactionsHandler.start(domDataTreeService, input);
540     }
541
542     @Override
543     public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
544         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
545
546         final String shardName = input.getShardName();
547         if (Strings.isNullOrEmpty(shardName)) {
548             return failure(newError(ErrorType.APPLICATION, "bad-element", "A valid shard name must be specified"));
549         }
550
551         return shutdownShardGracefully(shardName);
552     }
553
554     @Override
555     public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
556         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
557
558         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
559         if (shardPrefix == null) {
560             return failure(newError(ErrorType.APPLICATION, "bad-element", "A valid shard prefix must be specified"));
561         }
562
563         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
564         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
565
566         return shutdownShardGracefully(cleanPrefixShardName);
567     }
568
569     private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
570         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
571         final ActorContext context = configDataStore.getActorContext();
572
573         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
574                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
575         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
576         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
577
578         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
579             @Override
580             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
581                 if (throwable != null) {
582                     shutdownShardAsk.failure(throwable);
583                 } else {
584                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
585                 }
586             }
587         }, context.getClientDispatcher());
588
589         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
590             @Override
591             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
592                 if (throwable != null) {
593                     rpcResult.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
594                         "Failed to gracefully shutdown shard", throwable).build());
595                 } else {
596                     // according to Patterns.gracefulStop API, we don't have to
597                     // check value of gracefulStopResult
598                     rpcResult.set(RpcResultBuilder.<Void>success().build());
599                 }
600             }
601         }, context.getClientDispatcher());
602         return rpcResult;
603     }
604
605     @Override
606     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
607
608         LOG.debug("Received register-constant rpc, input: {}", input);
609
610         if (input.getConstant() == null) {
611             return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
612         }
613
614         if (globalGetConstantRegistration != null) {
615             return failure(newError(ErrorType.RPC, "Registration present.",
616                     "There is already a get-constant rpc registered."));
617         }
618
619         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
620         return VOID_SUCCESS;
621     }
622
623     @Override
624     public Future<RpcResult<Void>> unregisterDefaultConstant() {
625         // FIXME: implement this
626         return null;
627     }
628
629     @Override
630     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
631         LOG.debug("Received unsubscribe-ddtl.");
632
633         if (idIntsDdtl == null || ddtlReg == null) {
634             return failure(newError(ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered."));
635         }
636
637         final ListenableFuture<DataListenerState> future = idIntsDdtl.tryFinishProcessing(ddtlReg);
638         ddtlReg = null;
639
640         return Futures.withFallback(Futures.transform(future, this::ddtlOutput),
641             t -> failure(newError(ErrorType.RPC, "resource-denied-transport", "Failed to finish processing",
642                 "clustering-it", "clustering-it", t)));
643     }
644
645     private RpcResult<UnsubscribeDdtlOutput> ddtlOutput(final DataListenerState state) {
646         if (state.changeCount() == 0) {
647             return failed(newError(ErrorType.APPLICATION, "No notification received.",
648                 "id-ints listener has not received any notifications."));
649         }
650
651         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
652         LOG.debug("Creating distributed datastore client for shard {}", shardName);
653
654         final ActorContext actorContext = configDataStore.getActorContext();
655         final Props distributedDataStoreClientProps = SimpleDataStoreClientActor.props(
656             actorContext.getCurrentMemberName(), "Shard-" + shardName, actorContext, shardName);
657
658         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
659         final DataStoreClient distributedDataStoreClient;
660         try {
661             distributedDataStoreClient = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30,
662                 TimeUnit.SECONDS);
663         } catch (final Exception e) {
664             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
665             clientActor.tell(PoisonPill.getInstance(), noSender());
666             return failed(newError(ErrorType.APPLICATION, "Unable to create ds client for read.",
667                     "Unable to create ds client for read."));
668         }
669
670         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
671         final ClientTransaction tx = localHistory.createTransaction();
672         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
673                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
674                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
675
676         tx.abort();
677         localHistory.close();
678
679         final Optional<NormalizedNode<?, ?>> readResult;
680         try {
681             readResult = read.checkedGet();
682         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
683             LOG.error("Unable to read data to verify ddtl data.", e);
684             return failed(newError( ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."));
685         } finally {
686             distributedDataStoreClient.close();
687             clientActor.tell(PoisonPill.getInstance(), noSender());
688         }
689
690         // FIXME run a diff
691         final NormalizedNode<?, ?> expected = state.lastData().orNull();
692         final NormalizedNode<?, ?> actual = readResult.orNull();
693         final boolean equal = Objects.equals(expected, actual);
694         if (!equal) {
695             LOG.debug("Expected result {} read resulted in {}", expected, actual);
696         }
697         final RpcResultBuilder<UnsubscribeDdtlOutput> b = RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
698             .setCopyMatches(equal).build());
699
700 //        for (DataListenerViolation violation : state.violations()) {
701 //            final Optional<NormalizedNodeDiff> diff = violation.toDiff();
702 //            if (diff.isPresent()) {
703 //                b.withWarning(ErrorType.APPLICATION, "Sequence mismatch", diff.get().toString());
704 //            }
705 //        }
706
707         return b.build();
708     }
709
710     private static <T> RpcResult<T> failed(final RpcError error) {
711         return RpcResultBuilder.<T>failed().withRpcError(error).build();
712     }
713
714     private static <T> ListenableFuture<RpcResult<T>> failure(final RpcError error) {
715         return Futures.immediateFuture(failed(error));
716     }
717
718     private static <T> ListenableFuture<RpcResult<T>> success(final T result) {
719         return Futures.immediateFuture(RpcResultBuilder.success(result).build());
720     }
721 }