Remove prefix shard leftovers
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSystem;
12 import akka.dispatch.OnComplete;
13 import akka.pattern.Patterns;
14 import com.google.common.base.Strings;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.HashMap;
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import org.opendaylight.controller.cluster.ActorSystemProvider;
25 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
27 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
28 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
29 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
30 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
31 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
32 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
33 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
34 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
35 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
36 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
37 import org.opendaylight.mdsal.binding.api.NotificationService;
38 import org.opendaylight.mdsal.binding.api.RpcProviderService;
39 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
42 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
44 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
46 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
47 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
48 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
49 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
51 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
112 import org.opendaylight.yangtools.concepts.ListenerRegistration;
113 import org.opendaylight.yangtools.concepts.ObjectRegistration;
114 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
115 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
116 import org.opendaylight.yangtools.yang.common.RpcResult;
117 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
118 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
119 import org.slf4j.Logger;
120 import org.slf4j.LoggerFactory;
121 import scala.concurrent.duration.FiniteDuration;
122
123 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
124     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
125
126     private final RpcProviderService rpcRegistry;
127     private final ObjectRegistration<OdlMdsalLowlevelControlService> registration;
128     private final DistributedDataStoreInterface configDataStore;
129     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
130     private final DOMDataBroker domDataBroker;
131     private final NotificationPublishService notificationPublishService;
132     private final NotificationService notificationService;
133     private final DOMSchemaService schemaService;
134     private final ClusterSingletonServiceProvider singletonService;
135     private final DOMRpcProviderService domRpcService;
136     private final DOMDataTreeChangeService domDataTreeChangeService;
137     private final ActorSystem actorSystem;
138
139     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
140             routedRegistrations = new HashMap<>();
141
142     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
143
144     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
145     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
146     private FlappingSingletonService flappingSingletonService;
147     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
148     private IdIntsListener idIntsListener;
149     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
150
151     public MdsalLowLevelTestProvider(final RpcProviderService rpcRegistry,
152                                      final DOMRpcProviderService domRpcService,
153                                      final ClusterSingletonServiceProvider singletonService,
154                                      final DOMSchemaService schemaService,
155                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
156                                      final NotificationPublishService notificationPublishService,
157                                      final NotificationService notificationService,
158                                      final DOMDataBroker domDataBroker,
159                                      final DistributedDataStoreInterface configDataStore,
160                                      final ActorSystemProvider actorSystemProvider) {
161         this.rpcRegistry = rpcRegistry;
162         this.domRpcService = domRpcService;
163         this.singletonService = singletonService;
164         this.schemaService = schemaService;
165         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
166         this.notificationPublishService = notificationPublishService;
167         this.notificationService = notificationService;
168         this.domDataBroker = domDataBroker;
169         this.configDataStore = configDataStore;
170         this.actorSystem = actorSystemProvider.getActorSystem();
171
172         domDataTreeChangeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
173
174         registration = rpcRegistry.registerRpcImplementation(OdlMdsalLowlevelControlService.class, this);
175     }
176
177     @Override
178     @SuppressWarnings("checkstyle:IllegalCatch")
179     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
180             final UnregisterSingletonConstantInput input) {
181         LOG.info("In unregisterSingletonConstant");
182
183         if (getSingletonConstantRegistration == null) {
184             return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
185                     "No prior RPC was registered").buildFuture();
186         }
187
188         try {
189             getSingletonConstantRegistration.close();
190             getSingletonConstantRegistration = null;
191
192             return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
193         } catch (Exception e) {
194             String msg = "Error closing the singleton constant service";
195             LOG.error(msg, e);
196             return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
197                     ErrorType.APPLICATION, msg, e).buildFuture();
198         }
199     }
200
201     @Override
202     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
203             final StartPublishNotificationsInput input) {
204         LOG.info("In startPublishNotifications - input: {}", input);
205
206         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
207                 input.getSeconds().toJava(), input.getNotificationsPerSecond().toJava());
208
209         publishNotificationsTasks.put(input.getId(), task);
210
211         task.start();
212
213         return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
214     }
215
216     @Override
217     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
218         LOG.info("In subscribeDtcl - input: {}", input);
219
220         if (dtclReg != null) {
221             return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
222                 "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
223         }
224
225         idIntsListener = new IdIntsListener();
226
227         dtclReg = domDataTreeChangeService.registerDataTreeChangeListener(
228             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, WriteTransactionsHandler.ID_INT_YID),
229             idIntsListener);
230
231         return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
232     }
233
234     @Override
235     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
236         return WriteTransactionsHandler.start(domDataBroker, input);
237     }
238
239     @Override
240     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
241         return null;
242     }
243
244     @Override
245     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
246             final RemoveShardReplicaInput input) {
247         return null;
248     }
249
250     @Override
251     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
252         LOG.info("In subscribeYnl - input: {}", input);
253
254         if (ynlRegistrations.containsKey(input.getId())) {
255             return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
256                 "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
257         }
258
259         ynlRegistrations.put(input.getId(),
260                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
261
262         return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
263     }
264
265
266     @Override
267     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
268             final UnregisterBoundConstantInput input) {
269         LOG.info("In unregisterBoundConstant - {}", input);
270
271         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
272                 routedRegistrations.remove(input.getContext());
273
274         if (rpcRegistration == null) {
275             return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
276                 ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
277         }
278
279         rpcRegistration.close();
280         return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
281     }
282
283     @Override
284     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
285             final RegisterSingletonConstantInput input) {
286         LOG.info("In registerSingletonConstant - input: {}", input);
287
288         if (input.getConstant() == null) {
289             return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
290                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
291         }
292
293         getSingletonConstantRegistration =
294                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
295
296         return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
297     }
298
299     @Override
300     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
301             final RegisterDefaultConstantInput input) {
302         return null;
303     }
304
305     @Override
306     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
307             final UnregisterConstantInput input) {
308         LOG.info("In unregisterConstant");
309
310         if (globalGetConstantRegistration == null) {
311             return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
312                 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
313         }
314
315         globalGetConstantRegistration.close();
316         globalGetConstantRegistration = null;
317
318         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
319     }
320
321     @Override
322     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
323             final UnregisterFlappingSingletonInput input) {
324         LOG.info("In unregisterFlappingSingleton");
325
326         if (flappingSingletonService == null) {
327             return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
328                 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
329         }
330
331         final long flapCount = flappingSingletonService.setInactive();
332         flappingSingletonService = null;
333
334         return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
335                 .buildFuture();
336     }
337
338     @Override
339     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
340         throw new UnsupportedOperationException();
341     }
342
343     @Override
344     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
345         throw new UnsupportedOperationException();
346     }
347
348     @Override
349     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
350             final RegisterBoundConstantInput input) {
351         LOG.info("In registerBoundConstant - input: {}", input);
352
353         if (input.getContext() == null) {
354             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
355                     ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
356         }
357
358         if (input.getConstant() == null) {
359             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
360                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
361         }
362
363         if (routedRegistrations.containsKey(input.getContext())) {
364             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
365                 "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
366         }
367
368         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
369                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
370                         input.getConstant(), input.getContext());
371
372         routedRegistrations.put(input.getContext(), rpcRegistration);
373         return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
374     }
375
376     @Override
377     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
378             final RegisterFlappingSingletonInput input) {
379         LOG.info("In registerFlappingSingleton");
380
381         if (flappingSingletonService != null) {
382             return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
383                 "data-exists", "There is already an rpc registered").buildFuture();
384         }
385
386         flappingSingletonService = new FlappingSingletonService(singletonService);
387
388         return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
389     }
390
391     @Override
392     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
393         LOG.info("In unsubscribeDtcl");
394
395         if (idIntsListener == null || dtclReg == null) {
396             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
397                     ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
398         }
399
400         long timeout = 120L;
401         try {
402             idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
403         } catch (InterruptedException | ExecutionException | TimeoutException e) {
404             LOG.error("Unable to finish notification processing", e);
405             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
406                     "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
407         }
408
409         dtclReg.close();
410         dtclReg = null;
411
412         if (!idIntsListener.hasTriggered()) {
413             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
414                     "id-ints listener has not received any notifications.").buildFuture();
415         }
416
417         try (DOMDataTreeReadTransaction rTx = domDataBroker.newReadOnlyTransaction()) {
418             final Optional<NormalizedNode> readResult = rTx.read(LogicalDatastoreType.CONFIGURATION,
419                 WriteTransactionsHandler.ID_INT_YID).get();
420
421             if (!readResult.isPresent()) {
422                 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
423                         "No data read from id-ints list").buildFuture();
424             }
425
426             final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
427             if (!nodesEqual) {
428                 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
429                         idIntsListener.diffWithLocalCopy(readResult.get()));
430             }
431
432             return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
433                     .buildFuture();
434
435         } catch (final InterruptedException | ExecutionException e) {
436             LOG.error("Final read of id-ints failed", e);
437             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
438                     "Final read of id-ints failed", e).buildFuture();
439         }
440     }
441
442     @Override
443     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
444         LOG.info("In unsubscribeYnl - input: {}", input);
445
446         if (!ynlRegistrations.containsKey(input.getId())) {
447             return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
448                 ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
449         }
450
451         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
452         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
453
454         reg.close();
455
456         return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
457     }
458
459     @Override
460     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
461             final CheckPublishNotificationsInput input) {
462         LOG.info("In checkPublishNotifications - input: {}", input);
463
464         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
465
466         if (task == null) {
467             return Futures.immediateFuture(RpcResultBuilder.success(
468                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
469         }
470
471         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
472                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
473
474         if (task.getLastError() != null) {
475             LOG.error("Last error for {}", task, task.getLastError());
476             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
477         }
478
479         final CheckPublishNotificationsOutput output =
480                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
481
482         return RpcResultBuilder.success(output).buildFuture();
483     }
484
485     @Override
486     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
487             final ShutdownShardReplicaInput input) {
488         LOG.info("In shutdownShardReplica - input: {}", input);
489
490         final String shardName = input.getShardName();
491         if (Strings.isNullOrEmpty(shardName)) {
492             return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
493                 shardName + "is not a valid shard name").buildFuture();
494         }
495
496         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
497     }
498
499     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
500         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
501         final ActorUtils context = configDataStore.getActorUtils();
502
503         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
504                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
505         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
506         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
507
508         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
509             @Override
510             public void onComplete(final Throwable throwable, final ActorRef actorRef) {
511                 if (throwable != null) {
512                     shutdownShardAsk.failure(throwable);
513                 } else {
514                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
515                 }
516             }
517         }, context.getClientDispatcher());
518
519         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
520             @Override
521             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
522                 if (throwable != null) {
523                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
524                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
525                     rpcResult.set(failedResult);
526                 } else {
527                     // according to Patterns.gracefulStop API, we don't have to
528                     // check value of gracefulStopResult
529                     rpcResult.set(RpcResultBuilder.success(success).build());
530                 }
531             }
532         }, context.getClientDispatcher());
533         return rpcResult;
534     }
535
536     @Override
537     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
538         LOG.info("In registerConstant - input: {}", input);
539
540         if (input.getConstant() == null) {
541             return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
542                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
543         }
544
545         if (globalGetConstantRegistration != null) {
546             return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
547                     "data-exists", "There is already an rpc registered").buildFuture();
548         }
549
550         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
551         return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
552     }
553
554     @Override
555     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
556             final UnregisterDefaultConstantInput input) {
557         throw new UnsupportedOperationException();
558     }
559
560     @Override
561     @SuppressWarnings("checkstyle:IllegalCatch")
562     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
563         throw new UnsupportedOperationException();
564     }
565 }