8c4e9716f054de39b58324eb4a3245899d7d8b6a
[controller.git] / opendaylight / md-sal / samples / clustering-test-app / provider / src / main / java / org / opendaylight / controller / clustering / it / provider / MdsalLowLevelTestProvider.java
1 /*
2  * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.clustering.it.provider;
9
10 import static akka.actor.ActorRef.noSender;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.PoisonPill;
15 import akka.actor.Props;
16 import akka.dispatch.OnComplete;
17 import akka.pattern.Patterns;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Strings;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
34 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
36 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
37 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
38 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
39 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
40 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
41 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
42 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
43 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
45 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
47 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
48 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
49 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
51 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
52 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
58 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
59 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
60 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
68 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
126 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
127 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
128 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
129 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
130 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
131 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
132 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
133 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
134 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
135 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
136 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
137 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
138 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
139 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
140 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
141 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
142 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
143 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
144 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
145 import org.opendaylight.yangtools.concepts.ListenerRegistration;
146 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
147 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
148 import org.opendaylight.yangtools.yang.common.RpcResult;
149 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
150 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
151 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
152 import org.slf4j.Logger;
153 import org.slf4j.LoggerFactory;
154 import scala.concurrent.duration.FiniteDuration;
155
156 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
157
158     private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
159     private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
160             org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
161
162     private final RpcProviderRegistry rpcRegistry;
163     private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
164     private final DistributedShardFactory distributedShardFactory;
165     private final DistributedDataStoreInterface configDataStore;
166     private final DOMDataTreeService domDataTreeService;
167     private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
168     private final DOMDataBroker domDataBroker;
169     private final NotificationPublishService notificationPublishService;
170     private final NotificationService notificationService;
171     private final DOMSchemaService schemaService;
172     private final ClusterSingletonServiceProvider singletonService;
173     private final DOMRpcProviderService domRpcService;
174     private final PrefixLeaderHandler prefixLeaderHandler;
175     private final PrefixShardHandler prefixShardHandler;
176     private final DOMDataTreeChangeService domDataTreeChangeService;
177     private final ActorSystem actorSystem;
178
179     private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
180             routedRegistrations = new HashMap<>();
181
182     private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
183
184     private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
185     private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
186     private FlappingSingletonService flappingSingletonService;
187     private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
188     private IdIntsListener idIntsListener;
189     private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
190     private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
191     private IdIntsDOMDataTreeLIstener idIntsDdtl;
192
193
194
195     public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
196                                      final DOMRpcProviderService domRpcService,
197                                      final ClusterSingletonServiceProvider singletonService,
198                                      final DOMSchemaService schemaService,
199                                      final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
200                                      final NotificationPublishService notificationPublishService,
201                                      final NotificationService notificationService,
202                                      final DOMDataBroker domDataBroker,
203                                      final DOMDataTreeService domDataTreeService,
204                                      final DistributedShardFactory distributedShardFactory,
205                                      final DistributedDataStoreInterface configDataStore,
206                                      final ActorSystemProvider actorSystemProvider) {
207         this.rpcRegistry = rpcRegistry;
208         this.domRpcService = domRpcService;
209         this.singletonService = singletonService;
210         this.schemaService = schemaService;
211         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
212         this.notificationPublishService = notificationPublishService;
213         this.notificationService = notificationService;
214         this.domDataBroker = domDataBroker;
215         this.domDataTreeService = domDataTreeService;
216         this.distributedShardFactory = distributedShardFactory;
217         this.configDataStore = configDataStore;
218         this.actorSystem = actorSystemProvider.getActorSystem();
219
220         this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
221
222         domDataTreeChangeService =
223                 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
224
225         registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
226
227         prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
228                 bindingNormalizedNodeSerializer);
229     }
230
231     @Override
232     @SuppressWarnings("checkstyle:IllegalCatch")
233     public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
234             final UnregisterSingletonConstantInput input) {
235         LOG.info("In unregisterSingletonConstant");
236
237         if (getSingletonConstantRegistration == null) {
238             return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
239                     "No prior RPC was registered").buildFuture();
240         }
241
242         try {
243             getSingletonConstantRegistration.close();
244             getSingletonConstantRegistration = null;
245
246             return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
247         } catch (Exception e) {
248             String msg = "Error closing the singleton constant service";
249             LOG.error(msg, e);
250             return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
251                     ErrorType.APPLICATION, msg, e).buildFuture();
252         }
253     }
254
255     @Override
256     public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
257             final StartPublishNotificationsInput input) {
258         LOG.info("In startPublishNotifications - input: {}", input);
259
260         final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
261                 input.getSeconds(), input.getNotificationsPerSecond());
262
263         publishNotificationsTasks.put(input.getId(), task);
264
265         task.start();
266
267         return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
268     }
269
270     @Override
271     public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
272         LOG.info("In subscribeDtcl - input: {}", input);
273
274         if (dtclReg != null) {
275             return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
276                 "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
277         }
278
279         idIntsListener = new IdIntsListener();
280
281         dtclReg = domDataTreeChangeService
282                 .registerDataTreeChangeListener(
283                         new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
284                                 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
285                         idIntsListener);
286
287         return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
288     }
289
290     @Override
291     public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
292         return WriteTransactionsHandler.start(domDataBroker, input);
293     }
294
295     @Override
296     public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
297         return null;
298     }
299
300     @Override
301     public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
302             final RemoveShardReplicaInput input) {
303         return null;
304     }
305
306     @Override
307     public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
308         LOG.info("In subscribeYnl - input: {}", input);
309
310         if (ynlRegistrations.containsKey(input.getId())) {
311             return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
312                 "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
313         }
314
315         ynlRegistrations.put(input.getId(),
316                 notificationService.registerNotificationListener(new YnlListener(input.getId())));
317
318         return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
319     }
320
321     @Override
322     public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
323         LOG.info("In removePrefixShard - input: {}", input);
324
325         return prefixShardHandler.onRemovePrefixShard(input);
326     }
327
328     @Override
329     public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
330             final BecomePrefixLeaderInput input) {
331         LOG.info("n becomePrefixLeader - input: {}", input);
332
333         return prefixLeaderHandler.makeLeaderLocal(input);
334     }
335
336     @Override
337     public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
338             final UnregisterBoundConstantInput input) {
339         LOG.info("In unregisterBoundConstant - {}", input);
340
341         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
342                 routedRegistrations.remove(input.getContext());
343
344         if (rpcRegistration == null) {
345             return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
346                 ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
347         }
348
349         rpcRegistration.close();
350         return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
351     }
352
353     @Override
354     public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
355             final RegisterSingletonConstantInput input) {
356         LOG.info("In registerSingletonConstant - input: {}", input);
357
358         if (input.getConstant() == null) {
359             return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
360                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
361         }
362
363         getSingletonConstantRegistration =
364                 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
365
366         return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
367     }
368
369     @Override
370     public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
371             final RegisterDefaultConstantInput input) {
372         return null;
373     }
374
375     @Override
376     public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
377             final UnregisterConstantInput input) {
378         LOG.info("In unregisterConstant");
379
380         if (globalGetConstantRegistration == null) {
381             return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
382                 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
383         }
384
385         globalGetConstantRegistration.close();
386         globalGetConstantRegistration = null;
387
388         return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
389     }
390
391     @Override
392     public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
393             final UnregisterFlappingSingletonInput input) {
394         LOG.info("In unregisterFlappingSingleton");
395
396         if (flappingSingletonService == null) {
397             return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
398                 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
399         }
400
401         final long flapCount = flappingSingletonService.setInactive();
402         flappingSingletonService = null;
403
404         return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
405                 .buildFuture();
406     }
407
408     @Override
409     public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
410         return null;
411     }
412
413     @Override
414     public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
415         LOG.info("In subscribeDdtl");
416
417         if (ddtlReg != null) {
418             return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(ErrorType.RPC,
419                 "data-exists", "There is already a listener registered for id-ints").buildFuture();
420         }
421
422         idIntsDdtl = new IdIntsDOMDataTreeLIstener();
423
424         try {
425             ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
426                     Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
427                             ProduceTransactionsHandler.ID_INT_YID)),
428                     true, Collections.emptyList());
429         } catch (DOMDataTreeLoopException e) {
430             LOG.error("Failed to register DOMDataTreeListener", e);
431             return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(
432                 ErrorType.APPLICATION, "Failed to register DOMDataTreeListener", e).buildFuture();
433         }
434
435         return RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).buildFuture();
436     }
437
438     @Override
439     public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
440             final RegisterBoundConstantInput input) {
441         LOG.info("In registerBoundConstant - input: {}", input);
442
443         if (input.getContext() == null) {
444             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
445                     ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
446         }
447
448         if (input.getConstant() == null) {
449             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
450                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
451         }
452
453         if (routedRegistrations.containsKey(input.getContext())) {
454             return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
455                 "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
456         }
457
458         final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
459                 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
460                         input.getConstant(), input.getContext());
461
462         routedRegistrations.put(input.getContext(), rpcRegistration);
463         return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
464     }
465
466     @Override
467     public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
468             final RegisterFlappingSingletonInput input) {
469         LOG.info("In registerFlappingSingleton");
470
471         if (flappingSingletonService != null) {
472             return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
473                 "data-exists", "There is already an rpc registered").buildFuture();
474         }
475
476         flappingSingletonService = new FlappingSingletonService(singletonService);
477
478         return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
479     }
480
481     @Override
482     public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
483         LOG.info("In unsubscribeDtcl");
484
485         if (idIntsListener == null || dtclReg == null) {
486             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
487                     ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
488         }
489
490         long timeout = 120L;
491         try {
492             idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
493         } catch (InterruptedException | ExecutionException | TimeoutException e) {
494             LOG.error("Unable to finish notification processing", e);
495             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
496                     "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
497         }
498
499         dtclReg.close();
500         dtclReg = null;
501
502         if (!idIntsListener.hasTriggered()) {
503             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
504                     "id-ints listener has not received any notifications.").buildFuture();
505         }
506
507         final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
508         try {
509             final Optional<NormalizedNode<?, ?>> readResult =
510                     rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).get();
511
512             if (!readResult.isPresent()) {
513                 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
514                         "No data read from id-ints list").buildFuture();
515             }
516
517             final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
518             if (!nodesEqual) {
519                 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
520                         idIntsListener.diffWithLocalCopy(readResult.get()));
521             }
522
523             return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
524                     .buildFuture();
525
526         } catch (final InterruptedException | ExecutionException e) {
527             LOG.error("Final read of id-ints failed", e);
528             return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
529                     "Final read of id-ints failed", e).buildFuture();
530         }
531     }
532
533     @Override
534     public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
535         LOG.info("In createPrefixShard - input: {}", input);
536
537         return prefixShardHandler.onCreatePrefixShard(input);
538     }
539
540     @Override
541     public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
542             final DeconfigureIdIntsShardInput input) {
543         return null;
544     }
545
546     @Override
547     public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
548         LOG.info("In unsubscribeYnl - input: {}", input);
549
550         if (!ynlRegistrations.containsKey(input.getId())) {
551             return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
552                 ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
553         }
554
555         final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
556         final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
557
558         reg.close();
559
560         return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
561     }
562
563     @Override
564     public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
565             final CheckPublishNotificationsInput input) {
566         LOG.info("In checkPublishNotifications - input: {}", input);
567
568         final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
569
570         if (task == null) {
571             return Futures.immediateFuture(RpcResultBuilder.success(
572                     new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
573         }
574
575         final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
576                 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
577
578         if (task.getLastError() != null) {
579             LOG.error("Last error for {}", task, task.getLastError());
580             checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
581         }
582
583         final CheckPublishNotificationsOutput output =
584                 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
585
586         return RpcResultBuilder.success(output).buildFuture();
587     }
588
589     @Override
590     public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
591             final ProduceTransactionsInput input) {
592         LOG.info("In produceTransactions - input: {}", input);
593         return ProduceTransactionsHandler.start(domDataTreeService, input);
594     }
595
596     @Override
597     public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
598             final ShutdownShardReplicaInput input) {
599         LOG.info("In shutdownShardReplica - input: {}", input);
600
601         final String shardName = input.getShardName();
602         if (Strings.isNullOrEmpty(shardName)) {
603             return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
604                 shardName + "is not a valid shard name").buildFuture();
605         }
606
607         return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
608     }
609
610     @Override
611     public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
612             final ShutdownPrefixShardReplicaInput input) {
613         LOG.info("shutdownPrefixShardReplica - input: {}", input);
614
615         final InstanceIdentifier<?> shardPrefix = input.getPrefix();
616
617         if (shardPrefix == null) {
618             return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
619                     "A valid shard prefix must be specified").buildFuture();
620         }
621
622         final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
623         final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
624
625         return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
626     }
627
628     private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
629         final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
630         final ActorUtils context = configDataStore.getActorUtils();
631
632         long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
633                 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
634         final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
635         final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
636
637         context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
638             @Override
639             public void onComplete(final Throwable throwable, final ActorRef actorRef) {
640                 if (throwable != null) {
641                     shutdownShardAsk.failure(throwable);
642                 } else {
643                     shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
644                 }
645             }
646         }, context.getClientDispatcher());
647
648         shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
649             @Override
650             public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
651                 if (throwable != null) {
652                     final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
653                             .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
654                     rpcResult.set(failedResult);
655                 } else {
656                     // according to Patterns.gracefulStop API, we don't have to
657                     // check value of gracefulStopResult
658                     rpcResult.set(RpcResultBuilder.success(success).build());
659                 }
660             }
661         }, context.getClientDispatcher());
662         return rpcResult;
663     }
664
665     @Override
666     public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
667         LOG.info("In registerConstant - input: {}", input);
668
669         if (input.getConstant() == null) {
670             return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
671                     ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
672         }
673
674         if (globalGetConstantRegistration != null) {
675             return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
676                     "data-exists", "There is already an rpc registered").buildFuture();
677         }
678
679         globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
680         return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
681     }
682
683     @Override
684     public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
685             final UnregisterDefaultConstantInput input) {
686         return null;
687     }
688
689     @Override
690     @SuppressWarnings("checkstyle:IllegalCatch")
691     public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
692         LOG.info("In unsubscribeDdtl");
693
694         if (idIntsDdtl == null || ddtlReg == null) {
695             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
696                     ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
697         }
698
699         long timeout = 120L;
700         try {
701             idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
702         } catch (InterruptedException | ExecutionException | TimeoutException e) {
703             LOG.error("Unable to finish notification processing", e);
704             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
705                     "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
706         }
707
708         ddtlReg.close();
709         ddtlReg = null;
710
711         if (!idIntsDdtl.hasTriggered()) {
712             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
713                     "No notification received.", "id-ints listener has not received any notifications").buildFuture();
714         }
715
716         final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
717         LOG.debug("Creating distributed datastore client for shard {}", shardName);
718
719         final ActorUtils actorUtils = configDataStore.getActorUtils();
720         final Props distributedDataStoreClientProps =
721                 SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(),
722                         "Shard-" + shardName, actorUtils, shardName);
723
724         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
725         final DataStoreClient distributedDataStoreClient;
726         try {
727             distributedDataStoreClient = SimpleDataStoreClientActor
728                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
729         } catch (RuntimeException e) {
730             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
731             clientActor.tell(PoisonPill.getInstance(), noSender());
732             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
733                     .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
734         }
735
736         final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
737         final ClientTransaction tx = localHistory.createTransaction();
738         final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
739                 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
740
741         tx.abort();
742         localHistory.close();
743         try {
744             final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
745             if (!optional.isPresent()) {
746                 return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
747                         "data-missing", "Final read from id-ints is empty").buildFuture();
748             }
749
750             return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
751                     idIntsDdtl.checkEqual(optional.get()))).buildFuture();
752
753         } catch (InterruptedException | ExecutionException e) {
754             LOG.error("Unable to read data to verify ddtl data", e);
755             return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
756                     .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
757         } finally {
758             distributedDataStoreClient.close();
759             clientActor.tell(PoisonPill.getInstance(), noSender());
760         }
761     }
762 }