688e7c0a51dab318a2fbb17f62d3e92980ac924e
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
40 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
41 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
42 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
44 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
49 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
51 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
61 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.controller.sal.core.api.model.SchemaService;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
70 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
71 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
103 import org.opendaylight.yangtools.concepts.ListenerRegistration;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.common.RpcError;
106 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
107 import org.opendaylight.yangtools.yang.common.RpcResult;
108 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
111 import org.slf4j.Logger;
112 import org.slf4j.LoggerFactory;
113 import scala.concurrent.duration.FiniteDuration;
114
115 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
116
117     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
118     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
119             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
120
121     private final RpcProviderRegistry rpcRegistry;
122     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
123     private final DistributedShardFactory distributedShardFactory;
124     private final DistributedDataStoreInterface configDataStore;
125     private final DOMDataTreeService domDataTreeService;
126     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
127     private final DOMDataBroker domDataBroker;
128     private final NotificationPublishService notificationPublishService;
129     private final NotificationService notificationService;
130     private final SchemaService schemaService;
131     private final ClusterSingletonServiceProvider singletonService;
132     private final DOMRpcProviderService domRpcService;
133     private final PrefixLeaderHandler prefixLeaderHandler;
134     private final PrefixShardHandler prefixShardHandler;
135     private final DOMDataTreeChangeService domDataTreeChangeService;
136     private final ActorSystem actorSystem;
137
138     private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
139             new HashMap<>();
140
141     private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
142
143     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
144     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
145     private FlappingSingletonService flappingSingletonService;
146     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
147     private IdIntsListener idIntsListener;
148     private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
149     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
150     private IdIntsDOMDataTreeLIstener idIntsDdtl;
151
152
153
154     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
155                                      final DOMRpcProviderService domRpcService,
156                                      final ClusterSingletonServiceProvider singletonService,
157                                      final SchemaService schemaService,
158                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
159                                      final NotificationPublishService notificationPublishService,
160                                      final NotificationService notificationService,
161                                      final DOMDataBroker domDataBroker,
162                                      final DOMDataTreeService domDataTreeService,
163                                      final DistributedShardFactory distributedShardFactory,
164                                      final DistributedDataStoreInterface configDataStore,
165                                      final ActorSystemProvider actorSystemProvider) {
166         this.rpcRegistry = rpcRegistry;
167         this.domRpcService = domRpcService;
168         this.singletonService = singletonService;
169         this.schemaService = schemaService;
170         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
171         this.notificationPublishService = notificationPublishService;
172         this.notificationService = notificationService;
173         this.domDataBroker = domDataBroker;
174         this.domDataTreeService = domDataTreeService;
175         this.distributedShardFactory = distributedShardFactory;
176         this.configDataStore = configDataStore;
177         this.actorSystem = actorSystemProvider.getActorSystem();
178
179         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
180
181         domDataTreeChangeService =
182                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
183
184         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
185
186         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
187                 bindingNormalizedNodeSerializer);
188     }
189
190     @Override
191     public Future<RpcResult<Void>> unregisterSingletonConstant() {
192         LOG.debug("unregister-singleton-constant");
193
194         if (getSingletonConstantRegistration == null) {
195             LOG.debug("No get-singleton-constant registration present.");
196             final RpcError rpcError = RpcResultBuilder
197                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
198             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
199             return Futures.immediateFuture(result);
200         }
201
202         try {
203             getSingletonConstantRegistration.close();
204             getSingletonConstantRegistration = null;
205
206             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
207         } catch (final Exception e) {
208             LOG.debug("There was a problem closing the singleton constant service", e);
209             final RpcError rpcError = RpcResultBuilder
210                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
211             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
212             return Futures.immediateFuture(result);
213         }
214     }
215
216     @Override
217     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
218         LOG.debug("publish-notifications, input: {}", input);
219
220         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
221                 input.getSeconds(), input.getNotificationsPerSecond());
222
223         publishNotificationsTasks.put(input.getId(), task);
224
225         task.start();
226
227         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
228     }
229
230     @Override
231     public Future<RpcResult<Void>> subscribeDtcl() {
232
233         if (dtclReg != null) {
234             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
235                     "There is already dataTreeChangeListener registered on id-ints list.");
236             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
237         }
238
239         idIntsListener = new IdIntsListener();
240
241         dtclReg = domDataTreeChangeService
242                 .registerDataTreeChangeListener(
243                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
244                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
245                         idIntsListener);
246
247         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
248     }
249
250     @Override
251     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
252         LOG.debug("write-transactions, input: {}", input);
253
254         final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
255
256         final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
257         writeTransactionsHandler.start(settableFuture);
258
259         return settableFuture;
260     }
261
262     @Override
263     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
264         return null;
265     }
266
267     @Override
268     public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
269         return null;
270     }
271
272     @Override
273     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
274
275         LOG.debug("subscribe-ynl, input: {}", input);
276
277         if (ynlRegistrations.containsKey(input.getId())) {
278             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
279                     "There is already ynl listener registered for this id: " + input.getId());
280             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
281         }
282
283         ynlRegistrations.put(input.getId(),
284                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
285
286         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
287     }
288
289     @Override
290     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
291         LOG.debug("remove-prefix-shard, input: {}", input);
292
293         return prefixShardHandler.onRemovePrefixShard(input);
294     }
295
296     @Override
297     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
298         LOG.debug("become-prefix-leader, input: {}", input);
299
300         return prefixLeaderHandler.makeLeaderLocal(input);
301     }
302
303     @Override
304     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
305         LOG.debug("unregister-bound-constant, {}", input);
306
307         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
308                 routedRegistrations.remove(input.getContext());
309
310         if (registration == null) {
311             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
312             final RpcError rpcError = RpcResultBuilder
313                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
314             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
315             return Futures.immediateFuture(result);
316         }
317
318         registration.close();
319         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
320     }
321
322     @Override
323     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
324
325         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
326
327         if (input.getConstant() == null) {
328             final RpcError error = RpcResultBuilder.newError(
329                     ErrorType.RPC, "Invalid input.", "Constant value is null");
330             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
331         }
332
333         getSingletonConstantRegistration =
334                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
335
336         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
337     }
338
339     @Override
340     public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
341         return null;
342     }
343
344     @Override
345     public Future<RpcResult<Void>> unregisterConstant() {
346
347         if (globalGetConstantRegistration == null) {
348             final RpcError rpcError = RpcResultBuilder
349                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
350             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
351             return Futures.immediateFuture(result);
352         }
353
354         globalGetConstantRegistration.close();
355         globalGetConstantRegistration = null;
356
357         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
358     }
359
360     @Override
361     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
362         LOG.debug("unregister-flapping-singleton received.");
363
364         if (flappingSingletonService == null) {
365             final RpcError rpcError = RpcResultBuilder
366                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
367             final RpcResult<UnregisterFlappingSingletonOutput> result =
368                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
369             return Futures.immediateFuture(result);
370         }
371
372         final long flapCount = flappingSingletonService.setInactive();
373         flappingSingletonService = null;
374
375         final UnregisterFlappingSingletonOutput output =
376                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
377
378         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
379     }
380
381     @Override
382     public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
383         return null;
384     }
385
386     @Override
387     public Future<RpcResult<Void>> subscribeDdtl() {
388
389         if (ddtlReg != null) {
390             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
391                     "There is already dataTreeChangeListener registered on id-ints list.");
392             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
393         }
394
395         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
396
397         try {
398             ddtlReg =
399                     domDataTreeService.registerListener(idIntsDdtl,
400                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
401                                     ProduceTransactionsHandler.ID_INT_YID))
402                             , true, Collections.emptyList());
403         } catch (DOMDataTreeLoopException e) {
404             LOG.error("Failed to register DOMDataTreeListener.", e);
405
406         }
407
408         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
409     }
410
411     @Override
412     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
413         LOG.debug("register-bound-constant: {}", input);
414
415         if (input.getContext() == null) {
416             final RpcError error = RpcResultBuilder.newError(
417                     ErrorType.RPC, "Invalid input.", "Context value is null");
418             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
419         }
420
421         if (input.getConstant() == null) {
422             final RpcError error = RpcResultBuilder.newError(
423                     ErrorType.RPC, "Invalid input.", "Constant value is null");
424             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
425         }
426
427         if (routedRegistrations.containsKey(input.getContext())) {
428             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
429                     "There is already a rpc registered for context: " + input.getContext());
430             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
431         }
432
433         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
434                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
435                         input.getConstant(), input.getContext());
436
437         routedRegistrations.put(input.getContext(), registration);
438         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
439     }
440
441     @Override
442     public Future<RpcResult<Void>> registerFlappingSingleton() {
443         LOG.debug("Received register-flapping-singleton.");
444
445         if (flappingSingletonService != null) {
446             final RpcError error = RpcResultBuilder.newError(
447                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
448             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
449         }
450
451         flappingSingletonService = new FlappingSingletonService(singletonService);
452
453         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
454     }
455
456     @Override
457     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
458         LOG.debug("Received unsubscribe-dtcl");
459
460         if (idIntsListener == null || dtclReg == null) {
461             final RpcError error = RpcResultBuilder.newError(
462                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
463             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
464         }
465
466         dtclReg.close();
467         dtclReg = null;
468
469         if (!idIntsListener.hasTriggered()) {
470             final RpcError error = RpcResultBuilder.newError(
471                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
472                             "any notifications.");
473             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
474                     .withRpcError(error).build());
475         }
476
477         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
478         try {
479             final Optional<NormalizedNode<?, ?>> readResult =
480                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
481
482             if (!readResult.isPresent()) {
483                 final RpcError error = RpcResultBuilder.newError(
484                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
485                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
486                         .withRpcError(error).build());
487             }
488
489             return Futures.immediateFuture(
490                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
491                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
492
493         } catch (final ReadFailedException e) {
494             final RpcError error = RpcResultBuilder.newError(
495                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
496             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
497                     .withRpcError(error).build());
498
499         }
500     }
501
502     @Override
503     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
504         LOG.debug("create-prefix-shard, input: {}", input);
505
506         return prefixShardHandler.onCreatePrefixShard(input);
507     }
508
509     @Override
510     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
511         return null;
512     }
513
514     @Override
515     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
516         LOG.debug("Received unsubscribe-ynl, input: {}", input);
517
518         if (!ynlRegistrations.containsKey(input.getId())) {
519             final RpcError rpcError = RpcResultBuilder
520                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
521             final RpcResult<UnsubscribeYnlOutput> result =
522                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
523             return Futures.immediateFuture(result);
524         }
525
526         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
527         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
528
529         registration.close();
530
531         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
532     }
533
534     @Override
535     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
536             final CheckPublishNotificationsInput input) {
537
538         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
539
540         if (task == null) {
541             return Futures.immediateFuture(RpcResultBuilder.success(
542                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
543         }
544
545         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
546                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
547
548         if (task.getLastError() != null) {
549             final StringWriter sw = new StringWriter();
550             final PrintWriter pw = new PrintWriter(sw);
551             task.getLastError().printStackTrace(pw);
552             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
553         }
554
555         final CheckPublishNotificationsOutput output =
556                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
557
558         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
559     }
560
561     @Override
562     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
563         LOG.debug("producer-transactions, input: {}", input);
564
565         final ProduceTransactionsHandler handler =
566                 new ProduceTransactionsHandler(domDataTreeService, input);
567
568         final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
569         handler.start(settableFuture);
570
571         return settableFuture;
572     }
573
574     @Override
575     public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
576         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
577
578         final String shardName = input.getShardName();
579         if (Strings.isNullOrEmpty(shardName)) {
580             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
581                     "A valid shard name must be specified");
582             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
583         }
584
585         return shutdownShardGracefully(shardName);
586     }
587
588     @Override
589     public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
590         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
591
592         final InstanceIdentifier shardPrefix = input.getPrefix();
593
594         if (shardPrefix == null) {
595             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
596                     "A valid shard prefix must be specified");
597             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
598         }
599
600         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
601         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
602
603         return shutdownShardGracefully(cleanPrefixShardName);
604     }
605
606     private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
607         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
608         final ActorContext context = configDataStore.getActorContext();
609
610         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
611                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
612         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
613         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
614
615         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
616             @Override
617             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
618                 if (throwable != null) {
619                     shutdownShardAsk.failure(throwable);
620                 } else {
621                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
622                 }
623             }
624         }, context.getClientDispatcher());
625
626         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
627             @Override
628             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
629                 if (throwable != null) {
630                     final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
631                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
632                     rpcResult.set(failedResult);
633                 } else {
634                     // according to Patterns.gracefulStop API, we don't have to
635                     // check value of gracefulStopResult
636                     rpcResult.set(RpcResultBuilder.<Void>success().build());
637                 }
638             }
639         }, context.getClientDispatcher());
640         return rpcResult;
641     }
642
643     @Override
644     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
645
646         LOG.debug("Received register-constant rpc, input: {}", input);
647
648         if (input.getConstant() == null) {
649             final RpcError error = RpcResultBuilder.newError(
650                     ErrorType.RPC, "Invalid input.", "Constant value is null");
651             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
652         }
653
654         if (globalGetConstantRegistration != null) {
655             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
656                     "There is already a get-constant rpc registered.");
657             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
658         }
659
660         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
661         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
662     }
663
664     @Override
665     public Future<RpcResult<Void>> unregisterDefaultConstant() {
666         return null;
667     }
668
669     @Override
670     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
671         LOG.debug("Received unsubscribe-ddtl.");
672
673         if (idIntsDdtl == null || ddtlReg == null) {
674             final RpcError error = RpcResultBuilder.newError(
675                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
676             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
677         }
678
679         ddtlReg.close();
680         ddtlReg = null;
681
682         if (!idIntsDdtl.hasTriggered()) {
683             final RpcError error = RpcResultBuilder.newError(
684                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
685                             "any notifications.");
686             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
687                     .withRpcError(error).build());
688         }
689
690         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
691         LOG.debug("Creating distributed datastore client for shard {}", shardName);
692
693         final ActorContext actorContext = configDataStore.getActorContext();
694         final Props distributedDataStoreClientProps =
695                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
696                         "Shard-" + shardName, actorContext, shardName);
697
698         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
699         final DataStoreClient distributedDataStoreClient;
700         try {
701             distributedDataStoreClient = SimpleDataStoreClientActor
702                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
703         } catch (final Exception e) {
704             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
705             clientActor.tell(PoisonPill.getInstance(), noSender());
706             final RpcError error = RpcResultBuilder.newError(
707                     ErrorType.APPLICATION, "Unable to create ds client for read.",
708                     "Unable to create ds client for read.");
709             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
710                     .withRpcError(error).build());
711         }
712
713         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
714         final ClientTransaction tx = localHistory.createTransaction();
715         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
716                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
717                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
718
719         tx.abort();
720         localHistory.close();
721         try {
722             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
723             if (!optional.isPresent()) {
724                 LOG.warn("Final read from client is empty.");
725                 final RpcError error = RpcResultBuilder.newError(
726                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
727                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
728                         .withRpcError(error).build());
729             }
730
731             return Futures.immediateFuture(
732                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
733                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
734
735         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
736             LOG.error("Unable to read data to verify ddtl data.", e);
737             final RpcError error = RpcResultBuilder.newError(
738                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
739             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
740                     .withRpcError(error).build());
741         } finally {
742             distributedDataStoreClient.close();
743             clientActor.tell(PoisonPill.getInstance(), noSender());
744         }
745     }
746 }