BUG-8494: Cleanup clustering-it-provider
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.clustering.it.provider;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.Map;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import org.opendaylight.controller.cluster.ActorSystemProvider;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
38 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
42 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
43 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
44 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
45 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
46 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
47 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
49 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
51 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
52 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
53 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
54 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
55 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
56 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
57 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
62 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
63 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
64 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
65 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
66 import org.opendaylight.controller.sal.core.api.model.SchemaService;
67 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
68 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
72 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
73 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
105 import org.opendaylight.yangtools.concepts.ListenerRegistration;
106 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
109 import org.opendaylight.yangtools.yang.common.RpcResult;
110 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
111 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.slf4j.Logger;
114 import org.slf4j.LoggerFactory;
115 import scala.concurrent.duration.FiniteDuration;
116
117 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
118
119     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
120     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
121             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
122
123     private final RpcProviderRegistry rpcRegistry;
124     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
125     private final DistributedShardFactory distributedShardFactory;
126     private final DistributedDataStoreInterface configDataStore;
127     private final DOMDataTreeService domDataTreeService;
128     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
129     private final DOMDataBroker domDataBroker;
130     private final NotificationPublishService notificationPublishService;
131     private final NotificationService notificationService;
132     private final SchemaService schemaService;
133     private final ClusterSingletonServiceProvider singletonService;
134     private final DOMRpcProviderService domRpcService;
135     private final PrefixLeaderHandler prefixLeaderHandler;
136     private final PrefixShardHandler prefixShardHandler;
137     private final DOMDataTreeChangeService domDataTreeChangeService;
138     private final ActorSystem actorSystem;
139
140     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
141             new HashMap<>();
142
143     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
144
145     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
146     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
147     private FlappingSingletonService flappingSingletonService;
148     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
149     private IdIntsListener idIntsListener;
150     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
151     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
152     private IdIntsDOMDataTreeLIstener idIntsDdtl;
153
154
155
156     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
157                                      final DOMRpcProviderService domRpcService,
158                                      final ClusterSingletonServiceProvider singletonService,
159                                      final SchemaService schemaService,
160                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
161                                      final NotificationPublishService notificationPublishService,
162                                      final NotificationService notificationService,
163                                      final DOMDataBroker domDataBroker,
164                                      final DOMDataTreeService domDataTreeService,
165                                      final DistributedShardFactory distributedShardFactory,
166                                      final DistributedDataStoreInterface configDataStore,
167                                      final ActorSystemProvider actorSystemProvider) {
168         this.rpcRegistry = rpcRegistry;
169         this.domRpcService = domRpcService;
170         this.singletonService = singletonService;
171         this.schemaService = schemaService;
172         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
173         this.notificationPublishService = notificationPublishService;
174         this.notificationService = notificationService;
175         this.domDataBroker = domDataBroker;
176         this.domDataTreeService = domDataTreeService;
177         this.distributedShardFactory = distributedShardFactory;
178         this.configDataStore = configDataStore;
179         this.actorSystem = actorSystemProvider.getActorSystem();
180
181         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
182
183         domDataTreeChangeService =
184                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
185
186         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
187
188         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
189                 bindingNormalizedNodeSerializer);
190     }
191
192     @Override
193     public Future<RpcResult<Void>> unregisterSingletonConstant() {
194         LOG.debug("unregister-singleton-constant");
195
196         if (getSingletonConstantRegistration == null) {
197             LOG.debug("No get-singleton-constant registration present.");
198             final RpcError rpcError = RpcResultBuilder
199                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
200             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
201             return Futures.immediateFuture(result);
202         }
203
204         try {
205             getSingletonConstantRegistration.close();
206             getSingletonConstantRegistration = null;
207
208             return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
209         } catch (final Exception e) {
210             LOG.debug("There was a problem closing the singleton constant service", e);
211             final RpcError rpcError = RpcResultBuilder
212                     .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
213             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
214             return Futures.immediateFuture(result);
215         }
216     }
217
218     @Override
219     public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
220         LOG.debug("publish-notifications, input: {}", input);
221
222         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
223                 input.getSeconds(), input.getNotificationsPerSecond());
224
225         publishNotificationsTasks.put(input.getId(), task);
226
227         task.start();
228
229         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
230     }
231
232     @Override
233     public Future<RpcResult<Void>> subscribeDtcl() {
234
235         if (dtclReg != null) {
236             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
237                     "There is already dataTreeChangeListener registered on id-ints list.");
238             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
239         }
240
241         idIntsListener = new IdIntsListener();
242
243         dtclReg = domDataTreeChangeService
244                 .registerDataTreeChangeListener(
245                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
246                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
247                         idIntsListener);
248
249         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
250     }
251
252     @Override
253     public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
254         LOG.debug("write-transactions, input: {}", input);
255         return WriteTransactionsHandler.start(domDataBroker, input);
256     }
257
258     @Override
259     public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
260         return null;
261     }
262
263     @Override
264     public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
265         return null;
266     }
267
268     @Override
269     public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
270
271         LOG.debug("subscribe-ynl, input: {}", input);
272
273         if (ynlRegistrations.containsKey(input.getId())) {
274             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
275                     "There is already ynl listener registered for this id: " + input.getId());
276             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
277         }
278
279         ynlRegistrations.put(input.getId(),
280                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
281
282         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
283     }
284
285     @Override
286     public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
287         LOG.debug("remove-prefix-shard, input: {}", input);
288
289         return prefixShardHandler.onRemovePrefixShard(input);
290     }
291
292     @Override
293     public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
294         LOG.debug("become-prefix-leader, input: {}", input);
295
296         return prefixLeaderHandler.makeLeaderLocal(input);
297     }
298
299     @Override
300     public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
301         LOG.debug("unregister-bound-constant, {}", input);
302
303         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
304                 routedRegistrations.remove(input.getContext());
305
306         if (registration == null) {
307             LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
308             final RpcError rpcError = RpcResultBuilder
309                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
310             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
311             return Futures.immediateFuture(result);
312         }
313
314         registration.close();
315         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
316     }
317
318     @Override
319     public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
320
321         LOG.debug("Received register-singleton-constant rpc, input: {}", input);
322
323         if (input.getConstant() == null) {
324             final RpcError error = RpcResultBuilder.newError(
325                     ErrorType.RPC, "Invalid input.", "Constant value is null");
326             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
327         }
328
329         getSingletonConstantRegistration =
330                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
331
332         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
333     }
334
335     @Override
336     public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
337         return null;
338     }
339
340     @Override
341     public Future<RpcResult<Void>> unregisterConstant() {
342
343         if (globalGetConstantRegistration == null) {
344             final RpcError rpcError = RpcResultBuilder
345                     .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
346             final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
347             return Futures.immediateFuture(result);
348         }
349
350         globalGetConstantRegistration.close();
351         globalGetConstantRegistration = null;
352
353         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
354     }
355
356     @Override
357     public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
358         LOG.debug("unregister-flapping-singleton received.");
359
360         if (flappingSingletonService == null) {
361             final RpcError rpcError = RpcResultBuilder
362                     .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
363             final RpcResult<UnregisterFlappingSingletonOutput> result =
364                     RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
365             return Futures.immediateFuture(result);
366         }
367
368         final long flapCount = flappingSingletonService.setInactive();
369         flappingSingletonService = null;
370
371         final UnregisterFlappingSingletonOutput output =
372                 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
373
374         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
375     }
376
377     @Override
378     public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
379         return null;
380     }
381
382     @Override
383     public Future<RpcResult<Void>> subscribeDdtl() {
384
385         if (ddtlReg != null) {
386             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
387                     "There is already dataTreeChangeListener registered on id-ints list.");
388             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
389         }
390
391         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
392
393         try {
394             ddtlReg =
395                     domDataTreeService.registerListener(idIntsDdtl,
396                             Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
397                                     ProduceTransactionsHandler.ID_INT_YID))
398                             , true, Collections.emptyList());
399         } catch (DOMDataTreeLoopException e) {
400             LOG.error("Failed to register DOMDataTreeListener.", e);
401
402         }
403
404         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
405     }
406
407     @Override
408     public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
409         LOG.debug("register-bound-constant: {}", input);
410
411         if (input.getContext() == null) {
412             final RpcError error = RpcResultBuilder.newError(
413                     ErrorType.RPC, "Invalid input.", "Context value is null");
414             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
415         }
416
417         if (input.getConstant() == null) {
418             final RpcError error = RpcResultBuilder.newError(
419                     ErrorType.RPC, "Invalid input.", "Constant value is null");
420             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
421         }
422
423         if (routedRegistrations.containsKey(input.getContext())) {
424             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
425                     "There is already a rpc registered for context: " + input.getContext());
426             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
427         }
428
429         final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
430                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
431                         input.getConstant(), input.getContext());
432
433         routedRegistrations.put(input.getContext(), registration);
434         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
435     }
436
437     @Override
438     public Future<RpcResult<Void>> registerFlappingSingleton() {
439         LOG.debug("Received register-flapping-singleton.");
440
441         if (flappingSingletonService != null) {
442             final RpcError error = RpcResultBuilder.newError(
443                     ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
444             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
445         }
446
447         flappingSingletonService = new FlappingSingletonService(singletonService);
448
449         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
450     }
451
452     @Override
453     public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
454         LOG.debug("Received unsubscribe-dtcl");
455
456         if (idIntsListener == null || dtclReg == null) {
457             final RpcError error = RpcResultBuilder.newError(
458                     ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
459             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
460         }
461
462         try {
463             idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
464         } catch (InterruptedException | ExecutionException | TimeoutException e) {
465             final RpcError error = RpcResultBuilder.newError(
466                     ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
467                     "clustering-it", "clustering-it", e);
468             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
469                     .withRpcError(error).build());
470         }
471
472         dtclReg.close();
473         dtclReg = null;
474
475         if (!idIntsListener.hasTriggered()) {
476             final RpcError error = RpcResultBuilder.newError(
477                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
478                             "any notifications.");
479             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
480                     .withRpcError(error).build());
481         }
482
483         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
484         try {
485             final Optional<NormalizedNode<?, ?>> readResult =
486                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
487
488             if (!readResult.isPresent()) {
489                 final RpcError error = RpcResultBuilder.newError(
490                         ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
491                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
492                         .withRpcError(error).build());
493             }
494
495             return Futures.immediateFuture(
496                     RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
497                             .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
498
499         } catch (final ReadFailedException e) {
500             final RpcError error = RpcResultBuilder.newError(
501                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
502             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
503                     .withRpcError(error).build());
504
505         }
506     }
507
508     @Override
509     public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
510         LOG.debug("create-prefix-shard, input: {}", input);
511
512         return prefixShardHandler.onCreatePrefixShard(input);
513     }
514
515     @Override
516     public Future<RpcResult<Void>> deconfigureIdIntsShard() {
517         return null;
518     }
519
520     @Override
521     public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
522         LOG.debug("Received unsubscribe-ynl, input: {}", input);
523
524         if (!ynlRegistrations.containsKey(input.getId())) {
525             final RpcError rpcError = RpcResultBuilder
526                     .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
527             final RpcResult<UnsubscribeYnlOutput> result =
528                     RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
529             return Futures.immediateFuture(result);
530         }
531
532         final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
533         final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
534
535         registration.close();
536
537         return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
538     }
539
540     @Override
541     public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
542             final CheckPublishNotificationsInput input) {
543
544         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
545
546         if (task == null) {
547             return Futures.immediateFuture(RpcResultBuilder.success(
548                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
549         }
550
551         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
552                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
553
554         if (task.getLastError() != null) {
555             final StringWriter sw = new StringWriter();
556             final PrintWriter pw = new PrintWriter(sw);
557             task.getLastError().printStackTrace(pw);
558             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
559         }
560
561         final CheckPublishNotificationsOutput output =
562                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
563
564         return Futures.immediateFuture(RpcResultBuilder.success(output).build());
565     }
566
567     @Override
568     public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
569         LOG.debug("producer-transactions, input: {}", input);
570         return ProduceTransactionsHandler.start(domDataTreeService, input);
571     }
572
573     @Override
574     public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
575         LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
576
577         final String shardName = input.getShardName();
578         if (Strings.isNullOrEmpty(shardName)) {
579             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
580                     "A valid shard name must be specified");
581             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
582         }
583
584         return shutdownShardGracefully(shardName);
585     }
586
587     @Override
588     public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
589         LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
590
591         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
592
593         if (shardPrefix == null) {
594             final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
595                     "A valid shard prefix must be specified");
596             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
597         }
598
599         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
600         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
601
602         return shutdownShardGracefully(cleanPrefixShardName);
603     }
604
605     private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
606         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
607         final ActorContext context = configDataStore.getActorContext();
608
609         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
610                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
611         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
612         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
613
614         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
615             @Override
616             public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
617                 if (throwable != null) {
618                     shutdownShardAsk.failure(throwable);
619                 } else {
620                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
621                 }
622             }
623         }, context.getClientDispatcher());
624
625         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
626             @Override
627             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
628                 if (throwable != null) {
629                     final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
630                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
631                     rpcResult.set(failedResult);
632                 } else {
633                     // according to Patterns.gracefulStop API, we don't have to
634                     // check value of gracefulStopResult
635                     rpcResult.set(RpcResultBuilder.<Void>success().build());
636                 }
637             }
638         }, context.getClientDispatcher());
639         return rpcResult;
640     }
641
642     @Override
643     public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
644
645         LOG.debug("Received register-constant rpc, input: {}", input);
646
647         if (input.getConstant() == null) {
648             final RpcError error = RpcResultBuilder.newError(
649                     ErrorType.RPC, "Invalid input.", "Constant value is null");
650             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
651         }
652
653         if (globalGetConstantRegistration != null) {
654             final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
655                     "There is already a get-constant rpc registered.");
656             return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
657         }
658
659         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
660         return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
661     }
662
663     @Override
664     public Future<RpcResult<Void>> unregisterDefaultConstant() {
665         return null;
666     }
667
668     @Override
669     public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
670         LOG.debug("Received unsubscribe-ddtl.");
671
672         if (idIntsDdtl == null || ddtlReg == null) {
673             final RpcError error = RpcResultBuilder.newError(
674                     ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
675             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
676         }
677
678         try {
679             idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
680         } catch (InterruptedException | ExecutionException | TimeoutException e) {
681             final RpcError error = RpcResultBuilder.newError(
682                     ErrorType.RPC, "resource-denied-transport", "Unable to finish notification processing in 120 seconds.",
683                     "clustering-it", "clustering-it", e);
684             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
685                     .withRpcError(error).build());
686         }
687
688         ddtlReg.close();
689         ddtlReg = null;
690
691         if (!idIntsDdtl.hasTriggered()) {
692             final RpcError error = RpcResultBuilder.newError(
693                     ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
694                             "any notifications.");
695             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
696                     .withRpcError(error).build());
697         }
698
699         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
700         LOG.debug("Creating distributed datastore client for shard {}", shardName);
701
702         final ActorContext actorContext = configDataStore.getActorContext();
703         final Props distributedDataStoreClientProps =
704                 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
705                         "Shard-" + shardName, actorContext, shardName);
706
707         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
708         final DataStoreClient distributedDataStoreClient;
709         try {
710             distributedDataStoreClient = SimpleDataStoreClientActor
711                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
712         } catch (final Exception e) {
713             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
714             clientActor.tell(PoisonPill.getInstance(), noSender());
715             final RpcError error = RpcResultBuilder.newError(
716                     ErrorType.APPLICATION, "Unable to create ds client for read.",
717                     "Unable to create ds client for read.");
718             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
719                     .withRpcError(error).build());
720         }
721
722         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
723         final ClientTransaction tx = localHistory.createTransaction();
724         final CheckedFuture<Optional<NormalizedNode<?, ?>>,
725                 org.opendaylight.mdsal.common.api.ReadFailedException> read =
726                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
727
728         tx.abort();
729         localHistory.close();
730         try {
731             final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
732             if (!optional.isPresent()) {
733                 LOG.warn("Final read from client is empty.");
734                 final RpcError error = RpcResultBuilder.newError(
735                         ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
736                 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
737                         .withRpcError(error).build());
738             }
739
740             return Futures.immediateFuture(
741                     RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
742                             .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
743
744         } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
745             LOG.error("Unable to read data to verify ddtl data.", e);
746             final RpcError error = RpcResultBuilder.newError(
747                     ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
748             return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
749                     .withRpcError(error).build());
750         } finally {
751             distributedDataStoreClient.close();
752             clientActor.tell(PoisonPill.getInstance(), noSender());
753         }
754     }
755 }