f0569fa07dc4e8842d92504168b117832b99acec
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
40 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
41 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
42 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
44 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
49 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
51 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
61 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.controller.sal.core.api.model.SchemaService;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
70 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
71 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
102 import org.opendaylight.yangtools.concepts.ListenerRegistration;
103 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
104 import org.opendaylight.yangtools.yang.common.RpcError;
105 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
106 import org.opendaylight.yangtools.yang.common.RpcResult;
107 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
110 import org.slf4j.Logger;
111 import org.slf4j.LoggerFactory;
112 import scala.concurrent.duration.FiniteDuration;
113
114 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
115
116     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
117     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
118             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
119
120     private final RpcProviderRegistry rpcRegistry;
121     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
122     private final DistributedShardFactory distributedShardFactory;
123     private final DistributedDataStoreInterface configDataStore;
124     private final DOMDataTreeService domDataTreeService;
125     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
126     private final DOMDataBroker domDataBroker;
127     private final NotificationPublishService notificationPublishService;
128     private final NotificationService notificationService;
129     private final SchemaService schemaService;
130     private final ClusterSingletonServiceProvider singletonService;
131     private final DOMRpcProviderService domRpcService;
132     private final PrefixLeaderHandler prefixLeaderHandler;
133     private final PrefixShardHandler prefixShardHandler;
134     private final DOMDataTreeChangeService domDataTreeChangeService;
135     private final ActorSystem actorSystem;
136
137     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
138             new HashMap<>();
139
140     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
141
142     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
143     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
144     private FlappingSingletonService flappingSingletonService;
145     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
146     private IdIntsListener idIntsListener;
147     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
148     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
149     private IdIntsDOMDataTreeLIstener idIntsDdtl;
150
151
152
153     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
154                                      final DOMRpcProviderService domRpcService,
155                                      final ClusterSingletonServiceProvider singletonService,
156                                      final SchemaService schemaService,
157                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
158                                      final NotificationPublishService notificationPublishService,
159                                      final NotificationService notificationService,
160                                      final DOMDataBroker domDataBroker,
161                                      final DOMDataTreeService domDataTreeService,
162                                      final DistributedShardFactory distributedShardFactory,
163                                      final DistributedDataStoreInterface configDataStore,
164                                      final ActorSystemProvider actorSystemProvider) {
165         this.rpcRegistry = rpcRegistry;
166         this.domRpcService = domRpcService;
167         this.singletonService = singletonService;
168         this.schemaService = schemaService;
169         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
170         this.notificationPublishService = notificationPublishService;
171         this.notificationService = notificationService;
172         this.domDataBroker = domDataBroker;
173         this.domDataTreeService = domDataTreeService;
174         this.distributedShardFactory = distributedShardFactory;
175         this.configDataStore = configDataStore;
176         this.actorSystem = actorSystemProvider.getActorSystem();
177
178         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
179
180         domDataTreeChangeService =
181                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
182
183         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
184
185         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
186                 bindingNormalizedNodeSerializer);
187     }
188
189     @Override
190     public Future<RpcResult<Void>> unregisterSingletonConstant() {
191         LOG.debug("unregister-singleton-constant");
192
193         if (getSingletonConstantRegistration == null) {
194             LOG.debug("No get-singleton-constant registration present.");
195             final RpcError rpcError = RpcResultBuilder
196                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
197             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
198             return Futures.immediateFuture(result);
199         }
200
201         try {
202             getSingletonConstantRegistration.close();
203             getSingletonConstantRegistration = null;
204
205             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
206         } catch (final Exception e) {
207             LOG.debug("There was a problem closing the singleton constant service", e);
208             final RpcError rpcError = RpcResultBuilder
209                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
210             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
211             return Futures.immediateFuture(result);
212         }
213     }
214
215     @Override
216     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
217         LOG.debug("publish-notifications, input: {}", input);
218
219         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
220                 input.getSeconds(), input.getNotificationsPerSecond());
221
222         publishNotificationsTasks.put(input.getId(), task);
223
224         task.start();
225
226         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
227     }
228
229     @Override
230     public Future<RpcResult<Void>> subscribeDtcl() {
231
232         if (dtclReg != null) {
233             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
234                     "There is already dataTreeChangeListener registered on id-ints list.");
235             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
236         }
237
238         idIntsListener = new IdIntsListener();
239
240         dtclReg = domDataTreeChangeService
241                 .registerDataTreeChangeListener(
242                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
243                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
244                         idIntsListener);
245
246         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
247     }
248
249     @Override
250     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
251         LOG.debug("write-transactions, input: {}", input);
252
253         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
254
255         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
256         writeTransactionsHandler.start(settableFuture);
257
258         return settableFuture;
259     }
260
261     @Override
262     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
263         return null;
264     }
265
266     @Override
267     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
268         return null;
269     }
270
271     @Override
272     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
273
274         LOG.debug("subscribe-ynl, input: {}", input);
275
276         if (ynlRegistrations.containsKey(input.getId())) {
277             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
278                     "There is already ynl listener registered for this id: " + input.getId());
279             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
280         }
281
282         ynlRegistrations.put(input.getId(),
283                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
284
285         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
286     }
287
288     @Override
289     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
290         LOG.debug("remove-prefix-shard, input: {}", input);
291
292         return prefixShardHandler.onRemovePrefixShard(input);
293     }
294
295     @Override
296     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
297         LOG.debug("become-prefix-leader, input: {}", input);
298
299         return prefixLeaderHandler.makeLeaderLocal(input);
300     }
301
302     @Override
303     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
304         LOG.debug("unregister-bound-constant, {}", input);
305
306         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
307                 routedRegistrations.remove(input.getContext());
308
309         if (registration == null) {
310             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
311             final RpcError rpcError = RpcResultBuilder
312                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
313             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
314             return Futures.immediateFuture(result);
315         }
316
317         registration.close();
318         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
319     }
320
321     @Override
322     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
323
324         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
325
326         if (input.getConstant() == null) {
327             final RpcError error = RpcResultBuilder.newError(
328                     ErrorType.RPC, "Invalid input.", "Constant value is null");
329             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
330         }
331
332         getSingletonConstantRegistration =
333                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
334
335         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
336     }
337
338     @Override
339     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
340         return null;
341     }
342
343     @Override
344     public Future<RpcResult<Void>> unregisterConstant() {
345
346         if (globalGetConstantRegistration == null) {
347             final RpcError rpcError = RpcResultBuilder
348                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
349             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
350             return Futures.immediateFuture(result);
351         }
352
353         globalGetConstantRegistration.close();
354         globalGetConstantRegistration = null;
355
356         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
357     }
358
359     @Override
360     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
361         LOG.debug("unregister-flapping-singleton received.");
362
363         if (flappingSingletonService == null) {
364             final RpcError rpcError = RpcResultBuilder
365                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
366             final RpcResult<UnregisterFlappingSingletonOutput> result =
367                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
368             return Futures.immediateFuture(result);
369         }
370
371         final long flapCount = flappingSingletonService.setInactive();
372         flappingSingletonService = null;
373
374         final UnregisterFlappingSingletonOutput output =
375                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
376
377         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
378     }
379
380     @Override
381     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
382         return null;
383     }
384
385     @Override
386     public Future<RpcResult<Void>> subscribeDdtl() {
387
388         if (ddtlReg != null) {
389             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
390                     "There is already dataTreeChangeListener registered on id-ints list.");
391             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
392         }
393
394         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
395
396         try {
397             ddtlReg =
398                     domDataTreeService.registerListener(idIntsDdtl,
399                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
400                                     ProduceTransactionsHandler.ID_INT_YID))
401                             , true, Collections.emptyList());
402         } catch (DOMDataTreeLoopException e) {
403             LOG.error("Failed to register DOMDataTreeListener.", e);
404
405         }
406
407         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
408     }
409
410     @Override
411     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
412         LOG.debug("register-bound-constant: {}", input);
413
414         if (input.getContext() == null) {
415             final RpcError error = RpcResultBuilder.newError(
416                     ErrorType.RPC, "Invalid input.", "Context value is null");
417             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
418         }
419
420         if (input.getConstant() == null) {
421             final RpcError error = RpcResultBuilder.newError(
422                     ErrorType.RPC, "Invalid input.", "Constant value is null");
423             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
424         }
425
426         if (routedRegistrations.containsKey(input.getContext())) {
427             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
428                     "There is already a rpc registered for context: " + input.getContext());
429             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
430         }
431
432         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
433                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
434                         input.getConstant(), input.getContext());
435
436         routedRegistrations.put(input.getContext(), registration);
437         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
438     }
439
440     @Override
441     public Future<RpcResult<Void>> registerFlappingSingleton() {
442         LOG.debug("Received register-flapping-singleton.");
443
444         if (flappingSingletonService != null) {
445             final RpcError error = RpcResultBuilder.newError(
446                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
447             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
448         }
449
450         flappingSingletonService = new FlappingSingletonService(singletonService);
451
452         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
453     }
454
455     @Override
456     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
457         LOG.debug("Received unsubscribe-dtcl");
458
459         if (idIntsListener == null || dtclReg == null) {
460             final RpcError error = RpcResultBuilder.newError(
461                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
462             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
463         }
464
465         dtclReg.close();
466         dtclReg = null;
467
468         if (!idIntsListener.hasTriggered()) {
469             final RpcError error = RpcResultBuilder.newError(
470                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
471                             "any notifications.");
472             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
473                     .withRpcError(error).build());
474         }
475
476         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
477         try {
478             final Optional<NormalizedNode<?, ?>> readResult =
479                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
480
481             if (!readResult.isPresent()) {
482                 final RpcError error = RpcResultBuilder.newError(
483                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
484                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
485                         .withRpcError(error).build());
486             }
487
488             return Futures.immediateFuture(
489                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
490                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
491
492         } catch (final ReadFailedException e) {
493             final RpcError error = RpcResultBuilder.newError(
494                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
495             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
496                     .withRpcError(error).build());
497
498         }
499     }
500
501     @Override
502     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
503         LOG.debug("create-prefix-shard, input: {}", input);
504
505         return prefixShardHandler.onCreatePrefixShard(input);
506     }
507
508     @Override
509     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
510         return null;
511     }
512
513     @Override
514     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
515         LOG.debug("Received unsubscribe-ynl, input: {}", input);
516
517         if (!ynlRegistrations.containsKey(input.getId())) {
518             final RpcError rpcError = RpcResultBuilder
519                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
520             final RpcResult<UnsubscribeYnlOutput> result =
521                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
522             return Futures.immediateFuture(result);
523         }
524
525         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
526         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
527
528         registration.close();
529
530         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
531     }
532
533     @Override
534     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
535             final CheckPublishNotificationsInput input) {
536
537         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
538
539         if (task == null) {
540             return Futures.immediateFuture(RpcResultBuilder.success(
541                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
542         }
543
544         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
545                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
546
547         if (task.getLastError() != null) {
548             final StringWriter sw = new StringWriter();
549             final PrintWriter pw = new PrintWriter(sw);
550             task.getLastError().printStackTrace(pw);
551             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
552         }
553
554         final CheckPublishNotificationsOutput output =
555                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
556
557         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
558     }
559
560     @Override
561     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
562         LOG.debug("producer-transactions, input: {}", input);
563
564         final ProduceTransactionsHandler handler =
565                 new ProduceTransactionsHandler(domDataTreeService, input);
566
567         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
568         handler.start(settableFuture);
569
570         return settableFuture;
571     }
572
573     @Override
574     public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
575         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
576
577         final String shardName = input.getShardName();
578         if (Strings.isNullOrEmpty(shardName)) {
579             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
580                     "A valid shard name must be specified");
581             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
582         }
583
584         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
585         final ActorContext context = configDataStore.getActorContext();
586
587         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
588                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
589         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
590         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
591
592         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
593             @Override
594             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
595                 if (throwable != null) {
596                     shutdownShardAsk.failure(throwable);
597                 } else {
598                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
599                 }
600             }
601         }, context.getClientDispatcher());
602
603         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
604             @Override
605             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
606                 if (throwable != null) {
607                     final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
608                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
609                     rpcResult.set(failedResult);
610                 } else {
611                     // according to Patterns.gracefulStop API, we don't have to
612                     // check value of gracefulStopResult
613                     rpcResult.set(RpcResultBuilder.<Void>success().build());
614                 }
615             }
616         }, context.getClientDispatcher());
617
618         return rpcResult;
619     }
620
621     @Override
622     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
623
624         LOG.debug("Received register-constant rpc, input: {}", input);
625
626         if (input.getConstant() == null) {
627             final RpcError error = RpcResultBuilder.newError(
628                     ErrorType.RPC, "Invalid input.", "Constant value is null");
629             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
630         }
631
632         if (globalGetConstantRegistration != null) {
633             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
634                     "There is already a get-constant rpc registered.");
635             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
636         }
637
638         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
639         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
640     }
641
642     @Override
643     public Future<RpcResult<Void>> unregisterDefaultConstant() {
644         return null;
645     }
646
647     @Override
648     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
649         LOG.debug("Received unsubscribe-ddtl.");
650
651         if (idIntsDdtl == null || ddtlReg == null) {
652             final RpcError error = RpcResultBuilder.newError(
653                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
654             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
655         }
656
657         ddtlReg.close();
658         ddtlReg = null;
659
660         if (!idIntsDdtl.hasTriggered()) {
661             final RpcError error = RpcResultBuilder.newError(
662                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
663                             "any notifications.");
664             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
665                     .withRpcError(error).build());
666         }
667
668         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
669         LOG.debug("Creating distributed datastore client for shard {}", shardName);
670
671         final ActorContext actorContext = configDataStore.getActorContext();
672         final Props distributedDataStoreClientProps =
673                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
674                         "Shard-" + shardName, actorContext, shardName);
675
676         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
677         final DataStoreClient distributedDataStoreClient;
678         try {
679             distributedDataStoreClient = SimpleDataStoreClientActor
680                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
681         } catch (final Exception e) {
682             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
683             clientActor.tell(PoisonPill.getInstance(), noSender());
684             final RpcError error = RpcResultBuilder.newError(
685                     ErrorType.APPLICATION, "Unable to create ds client for read.",
686                     "Unable to create ds client for read.");
687             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
688                     .withRpcError(error).build());
689         }
690
691         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
692         final ClientTransaction tx = localHistory.createTransaction();
693         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
694                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
695                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
696
697         tx.abort();
698         localHistory.close();
699         try {
700             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
701             if (!optional.isPresent()) {
702                 LOG.warn("Final read from client is empty.");
703                 final RpcError error = RpcResultBuilder.newError(
704                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
705                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
706                         .withRpcError(error).build());
707             }
708
709             return Futures.immediateFuture(
710                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
711                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
712
713         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
714             LOG.error("Unable to read data to verify ddtl data.", e);
715             final RpcError error = RpcResultBuilder.newError(
716                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
717             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
718                     .withRpcError(error).build());
719         } finally {
720             distributedDataStoreClient.close();
721             clientActor.tell(PoisonPill.getInstance(), noSender());
722         }
723     }
724 }