dd85b968b743a708e5419b4793bdfef95d1d1e6a
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSystem;
12 import akka.dispatch.OnComplete;
13 import akka.pattern.Patterns;
14 import com.google.common.base.Strings;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.HashMap;
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import org.opendaylight.controller.cluster.ActorSystemProvider;
25 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
27 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
28 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
29 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
30 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
31 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
32 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
33 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
34 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
35 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
36 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
37 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
38 import org.opendaylight.mdsal.binding.api.NotificationService;
39 import org.opendaylight.mdsal.binding.api.RpcProviderService;
40 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
41 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
42 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
44 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
47 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
48 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
49 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
126 import org.opendaylight.yangtools.concepts.ListenerRegistration;
127 import org.opendaylight.yangtools.concepts.ObjectRegistration;
128 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
129 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
130 import org.opendaylight.yangtools.yang.common.RpcResult;
131 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
132 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
133 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
134 import org.slf4j.Logger;
135 import org.slf4j.LoggerFactory;
136 import scala.concurrent.duration.FiniteDuration;
137
138 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
139     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
140
141     private final RpcProviderService rpcRegistry;
142     private final ObjectRegistration<OdlMdsalLowlevelControlService> registration;
143     private final DistributedDataStoreInterface configDataStore;
144     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
145     private final DOMDataBroker domDataBroker;
146     private final NotificationPublishService notificationPublishService;
147     private final NotificationService notificationService;
148     private final DOMSchemaService schemaService;
149     private final ClusterSingletonServiceProvider singletonService;
150     private final DOMRpcProviderService domRpcService;
151     private final DOMDataTreeChangeService domDataTreeChangeService;
152     private final ActorSystem actorSystem;
153
154     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
155             routedRegistrations = new HashMap<>();
156
157     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
158
159     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
160     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
161     private FlappingSingletonService flappingSingletonService;
162     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
163     private IdIntsListener idIntsListener;
164     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
165
166     public MdsalLowLevelTestProvider(final RpcProviderService rpcRegistry,
167                                      final DOMRpcProviderService domRpcService,
168                                      final ClusterSingletonServiceProvider singletonService,
169                                      final DOMSchemaService schemaService,
170                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
171                                      final NotificationPublishService notificationPublishService,
172                                      final NotificationService notificationService,
173                                      final DOMDataBroker domDataBroker,
174                                      final DistributedDataStoreInterface configDataStore,
175                                      final ActorSystemProvider actorSystemProvider) {
176         this.rpcRegistry = rpcRegistry;
177         this.domRpcService = domRpcService;
178         this.singletonService = singletonService;
179         this.schemaService = schemaService;
180         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
181         this.notificationPublishService = notificationPublishService;
182         this.notificationService = notificationService;
183         this.domDataBroker = domDataBroker;
184         this.configDataStore = configDataStore;
185         this.actorSystem = actorSystemProvider.getActorSystem();
186
187         domDataTreeChangeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
188
189         registration = rpcRegistry.registerRpcImplementation(OdlMdsalLowlevelControlService.class, this);
190     }
191
192     @Override
193     @SuppressWarnings("checkstyle:IllegalCatch")
194     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
195             final UnregisterSingletonConstantInput input) {
196         LOG.info("In unregisterSingletonConstant");
197
198         if (getSingletonConstantRegistration == null) {
199             return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
200                     "No prior RPC was registered").buildFuture();
201         }
202
203         try {
204             getSingletonConstantRegistration.close();
205             getSingletonConstantRegistration = null;
206
207             return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
208         } catch (Exception e) {
209             String msg = "Error closing the singleton constant service";
210             LOG.error(msg, e);
211             return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
212                     ErrorType.APPLICATION, msg, e).buildFuture();
213         }
214     }
215
216     @Override
217     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
218             final StartPublishNotificationsInput input) {
219         LOG.info("In startPublishNotifications - input: {}", input);
220
221         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
222                 input.getSeconds().toJava(), input.getNotificationsPerSecond().toJava());
223
224         publishNotificationsTasks.put(input.getId(), task);
225
226         task.start();
227
228         return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
229     }
230
231     @Override
232     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
233         LOG.info("In subscribeDtcl - input: {}", input);
234
235         if (dtclReg != null) {
236             return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
237                 "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
238         }
239
240         idIntsListener = new IdIntsListener();
241
242         dtclReg = domDataTreeChangeService.registerDataTreeChangeListener(
243             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, WriteTransactionsHandler.ID_INT_YID),
244             idIntsListener);
245
246         return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
247     }
248
249     @Override
250     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
251         return WriteTransactionsHandler.start(domDataBroker, input);
252     }
253
254     @Override
255     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
256         return null;
257     }
258
259     @Override
260     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
261             final RemoveShardReplicaInput input) {
262         return null;
263     }
264
265     @Override
266     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
267         LOG.info("In subscribeYnl - input: {}", input);
268
269         if (ynlRegistrations.containsKey(input.getId())) {
270             return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
271                 "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
272         }
273
274         ynlRegistrations.put(input.getId(),
275                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
276
277         return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
278     }
279
280     @Override
281     public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
282         throw new UnsupportedOperationException();
283     }
284
285     @Override
286     public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
287             final BecomePrefixLeaderInput input) {
288         throw new UnsupportedOperationException();
289     }
290
291     @Override
292     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
293             final UnregisterBoundConstantInput input) {
294         LOG.info("In unregisterBoundConstant - {}", input);
295
296         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
297                 routedRegistrations.remove(input.getContext());
298
299         if (rpcRegistration == null) {
300             return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
301                 ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
302         }
303
304         rpcRegistration.close();
305         return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
306     }
307
308     @Override
309     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
310             final RegisterSingletonConstantInput input) {
311         LOG.info("In registerSingletonConstant - input: {}", input);
312
313         if (input.getConstant() == null) {
314             return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
315                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
316         }
317
318         getSingletonConstantRegistration =
319                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
320
321         return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
322     }
323
324     @Override
325     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
326             final RegisterDefaultConstantInput input) {
327         return null;
328     }
329
330     @Override
331     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
332             final UnregisterConstantInput input) {
333         LOG.info("In unregisterConstant");
334
335         if (globalGetConstantRegistration == null) {
336             return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
337                 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
338         }
339
340         globalGetConstantRegistration.close();
341         globalGetConstantRegistration = null;
342
343         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
344     }
345
346     @Override
347     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
348             final UnregisterFlappingSingletonInput input) {
349         LOG.info("In unregisterFlappingSingleton");
350
351         if (flappingSingletonService == null) {
352             return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
353                 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
354         }
355
356         final long flapCount = flappingSingletonService.setInactive();
357         flappingSingletonService = null;
358
359         return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
360                 .buildFuture();
361     }
362
363     @Override
364     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
365         throw new UnsupportedOperationException();
366     }
367
368     @Override
369     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
370         throw new UnsupportedOperationException();
371     }
372
373     @Override
374     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
375             final RegisterBoundConstantInput input) {
376         LOG.info("In registerBoundConstant - input: {}", input);
377
378         if (input.getContext() == null) {
379             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
380                     ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
381         }
382
383         if (input.getConstant() == null) {
384             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
385                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
386         }
387
388         if (routedRegistrations.containsKey(input.getContext())) {
389             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
390                 "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
391         }
392
393         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
394                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
395                         input.getConstant(), input.getContext());
396
397         routedRegistrations.put(input.getContext(), rpcRegistration);
398         return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
399     }
400
401     @Override
402     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
403             final RegisterFlappingSingletonInput input) {
404         LOG.info("In registerFlappingSingleton");
405
406         if (flappingSingletonService != null) {
407             return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
408                 "data-exists", "There is already an rpc registered").buildFuture();
409         }
410
411         flappingSingletonService = new FlappingSingletonService(singletonService);
412
413         return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
414     }
415
416     @Override
417     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
418         LOG.info("In unsubscribeDtcl");
419
420         if (idIntsListener == null || dtclReg == null) {
421             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
422                     ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
423         }
424
425         long timeout = 120L;
426         try {
427             idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
428         } catch (InterruptedException | ExecutionException | TimeoutException e) {
429             LOG.error("Unable to finish notification processing", e);
430             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
431                     "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
432         }
433
434         dtclReg.close();
435         dtclReg = null;
436
437         if (!idIntsListener.hasTriggered()) {
438             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
439                     "id-ints listener has not received any notifications.").buildFuture();
440         }
441
442         try (DOMDataTreeReadTransaction rTx = domDataBroker.newReadOnlyTransaction()) {
443             final Optional<NormalizedNode> readResult = rTx.read(LogicalDatastoreType.CONFIGURATION,
444                 WriteTransactionsHandler.ID_INT_YID).get();
445
446             if (!readResult.isPresent()) {
447                 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
448                         "No data read from id-ints list").buildFuture();
449             }
450
451             final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
452             if (!nodesEqual) {
453                 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
454                         idIntsListener.diffWithLocalCopy(readResult.get()));
455             }
456
457             return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
458                     .buildFuture();
459
460         } catch (final InterruptedException | ExecutionException e) {
461             LOG.error("Final read of id-ints failed", e);
462             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
463                     "Final read of id-ints failed", e).buildFuture();
464         }
465     }
466
467     @Override
468     public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
469         throw new UnsupportedOperationException();
470     }
471
472     @Override
473     public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
474             final DeconfigureIdIntsShardInput input) {
475         return null;
476     }
477
478     @Override
479     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
480         LOG.info("In unsubscribeYnl - input: {}", input);
481
482         if (!ynlRegistrations.containsKey(input.getId())) {
483             return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
484                 ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
485         }
486
487         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
488         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
489
490         reg.close();
491
492         return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
493     }
494
495     @Override
496     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
497             final CheckPublishNotificationsInput input) {
498         LOG.info("In checkPublishNotifications - input: {}", input);
499
500         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
501
502         if (task == null) {
503             return Futures.immediateFuture(RpcResultBuilder.success(
504                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
505         }
506
507         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
508                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
509
510         if (task.getLastError() != null) {
511             LOG.error("Last error for {}", task, task.getLastError());
512             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
513         }
514
515         final CheckPublishNotificationsOutput output =
516                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
517
518         return RpcResultBuilder.success(output).buildFuture();
519     }
520
521     @Override
522     public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
523             final ProduceTransactionsInput input) {
524         throw new UnsupportedOperationException();
525     }
526
527     @Override
528     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
529             final ShutdownShardReplicaInput input) {
530         LOG.info("In shutdownShardReplica - input: {}", input);
531
532         final String shardName = input.getShardName();
533         if (Strings.isNullOrEmpty(shardName)) {
534             return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
535                 shardName + "is not a valid shard name").buildFuture();
536         }
537
538         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
539     }
540
541     @Override
542     public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
543             final ShutdownPrefixShardReplicaInput input) {
544         LOG.info("shutdownPrefixShardReplica - input: {}", input);
545
546         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
547
548         if (shardPrefix == null) {
549             return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
550                     "A valid shard prefix must be specified").buildFuture();
551         }
552
553         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
554         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
555
556         return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
557     }
558
559     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
560         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
561         final ActorUtils context = configDataStore.getActorUtils();
562
563         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
564                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
565         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
566         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
567
568         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
569             @Override
570             public void onComplete(final Throwable throwable, final ActorRef actorRef) {
571                 if (throwable != null) {
572                     shutdownShardAsk.failure(throwable);
573                 } else {
574                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
575                 }
576             }
577         }, context.getClientDispatcher());
578
579         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
580             @Override
581             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
582                 if (throwable != null) {
583                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
584                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
585                     rpcResult.set(failedResult);
586                 } else {
587                     // according to Patterns.gracefulStop API, we don't have to
588                     // check value of gracefulStopResult
589                     rpcResult.set(RpcResultBuilder.success(success).build());
590                 }
591             }
592         }, context.getClientDispatcher());
593         return rpcResult;
594     }
595
596     @Override
597     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
598         LOG.info("In registerConstant - input: {}", input);
599
600         if (input.getConstant() == null) {
601             return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
602                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
603         }
604
605         if (globalGetConstantRegistration != null) {
606             return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
607                     "data-exists", "There is already an rpc registered").buildFuture();
608         }
609
610         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
611         return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
612     }
613
614     @Override
615     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
616             final UnregisterDefaultConstantInput input) {
617         throw new UnsupportedOperationException();
618     }
619
620     @Override
621     @SuppressWarnings("checkstyle:IllegalCatch")
622     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
623         throw new UnsupportedOperationException();
624     }
625 }