2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSystem;
12 import akka.dispatch.OnComplete;
13 import akka.pattern.Patterns;
14 import com.google.common.base.Strings;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.HashMap;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import org.opendaylight.controller.cluster.ActorSystemProvider;
25 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
27 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
28 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
29 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
30 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
31 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
32 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
33 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
34 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
35 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
36 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
37 import org.opendaylight.mdsal.binding.api.NotificationService;
38 import org.opendaylight.mdsal.binding.api.RpcProviderService;
39 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
42 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
44 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
46 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
47 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
48 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
49 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
51 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
112 import org.opendaylight.yangtools.concepts.ListenerRegistration;
113 import org.opendaylight.yangtools.concepts.ObjectRegistration;
114 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
115 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
116 import org.opendaylight.yangtools.yang.common.RpcResult;
117 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
118 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
119 import org.slf4j.Logger;
120 import org.slf4j.LoggerFactory;
121 import scala.concurrent.duration.FiniteDuration;
123 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
124 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
126 private final RpcProviderService rpcRegistry;
127 private final ObjectRegistration<OdlMdsalLowlevelControlService> registration;
128 private final DistributedDataStoreInterface configDataStore;
129 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
130 private final DOMDataBroker domDataBroker;
131 private final NotificationPublishService notificationPublishService;
132 private final NotificationService notificationService;
133 private final DOMSchemaService schemaService;
134 private final ClusterSingletonServiceProvider singletonService;
135 private final DOMRpcProviderService domRpcService;
136 private final DOMDataTreeChangeService domDataTreeChangeService;
137 private final ActorSystem actorSystem;
139 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
140 routedRegistrations = new HashMap<>();
142 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
144 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
145 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
146 private FlappingSingletonService flappingSingletonService;
147 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
148 private IdIntsListener idIntsListener;
149 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
151 public MdsalLowLevelTestProvider(final RpcProviderService rpcRegistry,
152 final DOMRpcProviderService domRpcService,
153 final ClusterSingletonServiceProvider singletonService,
154 final DOMSchemaService schemaService,
155 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
156 final NotificationPublishService notificationPublishService,
157 final NotificationService notificationService,
158 final DOMDataBroker domDataBroker,
159 final DistributedDataStoreInterface configDataStore,
160 final ActorSystemProvider actorSystemProvider) {
161 this.rpcRegistry = rpcRegistry;
162 this.domRpcService = domRpcService;
163 this.singletonService = singletonService;
164 this.schemaService = schemaService;
165 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
166 this.notificationPublishService = notificationPublishService;
167 this.notificationService = notificationService;
168 this.domDataBroker = domDataBroker;
169 this.configDataStore = configDataStore;
170 this.actorSystem = actorSystemProvider.getActorSystem();
172 domDataTreeChangeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
174 registration = rpcRegistry.registerRpcImplementation(OdlMdsalLowlevelControlService.class, this);
178 @SuppressWarnings("checkstyle:IllegalCatch")
179 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
180 final UnregisterSingletonConstantInput input) {
181 LOG.info("In unregisterSingletonConstant");
183 if (getSingletonConstantRegistration == null) {
184 return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
185 "No prior RPC was registered").buildFuture();
189 getSingletonConstantRegistration.close();
190 getSingletonConstantRegistration = null;
192 return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
193 } catch (Exception e) {
194 String msg = "Error closing the singleton constant service";
196 return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
197 ErrorType.APPLICATION, msg, e).buildFuture();
202 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
203 final StartPublishNotificationsInput input) {
204 LOG.info("In startPublishNotifications - input: {}", input);
206 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
207 input.getSeconds().toJava(), input.getNotificationsPerSecond().toJava());
209 publishNotificationsTasks.put(input.getId(), task);
213 return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
217 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
218 LOG.info("In subscribeDtcl - input: {}", input);
220 if (dtclReg != null) {
221 return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
222 "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
225 idIntsListener = new IdIntsListener();
227 dtclReg = domDataTreeChangeService.registerDataTreeChangeListener(
228 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, WriteTransactionsHandler.ID_INT_YID),
231 return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
235 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
236 return WriteTransactionsHandler.start(domDataBroker, input);
240 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
245 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
246 final RemoveShardReplicaInput input) {
251 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
252 LOG.info("In subscribeYnl - input: {}", input);
254 if (ynlRegistrations.containsKey(input.getId())) {
255 return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
256 "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
259 ynlRegistrations.put(input.getId(),
260 notificationService.registerNotificationListener(new YnlListener(input.getId())));
262 return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
267 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
268 final UnregisterBoundConstantInput input) {
269 LOG.info("In unregisterBoundConstant - {}", input);
271 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
272 routedRegistrations.remove(input.getContext());
274 if (rpcRegistration == null) {
275 return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
276 ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
279 rpcRegistration.close();
280 return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
284 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
285 final RegisterSingletonConstantInput input) {
286 LOG.info("In registerSingletonConstant - input: {}", input);
288 if (input.getConstant() == null) {
289 return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
290 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
293 getSingletonConstantRegistration =
294 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
296 return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
300 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
301 final RegisterDefaultConstantInput input) {
306 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
307 final UnregisterConstantInput input) {
308 LOG.info("In unregisterConstant");
310 if (globalGetConstantRegistration == null) {
311 return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
312 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
315 globalGetConstantRegistration.close();
316 globalGetConstantRegistration = null;
318 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
322 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
323 final UnregisterFlappingSingletonInput input) {
324 LOG.info("In unregisterFlappingSingleton");
326 if (flappingSingletonService == null) {
327 return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
328 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
331 final long flapCount = flappingSingletonService.setInactive();
332 flappingSingletonService = null;
334 return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
339 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
340 throw new UnsupportedOperationException();
344 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
345 throw new UnsupportedOperationException();
349 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
350 final RegisterBoundConstantInput input) {
351 LOG.info("In registerBoundConstant - input: {}", input);
353 if (input.getContext() == null) {
354 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
355 ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
358 if (input.getConstant() == null) {
359 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
360 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
363 if (routedRegistrations.containsKey(input.getContext())) {
364 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
365 "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
368 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
369 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
370 input.getConstant(), input.getContext());
372 routedRegistrations.put(input.getContext(), rpcRegistration);
373 return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
377 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
378 final RegisterFlappingSingletonInput input) {
379 LOG.info("In registerFlappingSingleton");
381 if (flappingSingletonService != null) {
382 return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
383 "data-exists", "There is already an rpc registered").buildFuture();
386 flappingSingletonService = new FlappingSingletonService(singletonService);
388 return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
392 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
393 LOG.info("In unsubscribeDtcl");
395 if (idIntsListener == null || dtclReg == null) {
396 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
397 ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
402 idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
403 } catch (InterruptedException | ExecutionException | TimeoutException e) {
404 LOG.error("Unable to finish notification processing", e);
405 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
406 "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
412 if (!idIntsListener.hasTriggered()) {
413 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
414 "id-ints listener has not received any notifications.").buildFuture();
417 try (DOMDataTreeReadTransaction rTx = domDataBroker.newReadOnlyTransaction()) {
418 final Optional<NormalizedNode> readResult = rTx.read(LogicalDatastoreType.CONFIGURATION,
419 WriteTransactionsHandler.ID_INT_YID).get();
421 if (!readResult.isPresent()) {
422 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
423 "No data read from id-ints list").buildFuture();
426 final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
428 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
429 idIntsListener.diffWithLocalCopy(readResult.get()));
432 return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
435 } catch (final InterruptedException | ExecutionException e) {
436 LOG.error("Final read of id-ints failed", e);
437 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
438 "Final read of id-ints failed", e).buildFuture();
443 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
444 LOG.info("In unsubscribeYnl - input: {}", input);
446 if (!ynlRegistrations.containsKey(input.getId())) {
447 return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
448 ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
451 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
452 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
456 return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
460 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
461 final CheckPublishNotificationsInput input) {
462 LOG.info("In checkPublishNotifications - input: {}", input);
464 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
467 return Futures.immediateFuture(RpcResultBuilder.success(
468 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
471 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
472 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
474 if (task.getLastError() != null) {
475 LOG.error("Last error for {}", task, task.getLastError());
476 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
479 final CheckPublishNotificationsOutput output =
480 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
482 return RpcResultBuilder.success(output).buildFuture();
486 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
487 final ShutdownShardReplicaInput input) {
488 LOG.info("In shutdownShardReplica - input: {}", input);
490 final String shardName = input.getShardName();
491 if (Strings.isNullOrEmpty(shardName)) {
492 return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
493 shardName + "is not a valid shard name").buildFuture();
496 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
499 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
500 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
501 final ActorUtils context = configDataStore.getActorUtils();
503 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
504 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
505 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
506 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
508 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
510 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
511 if (throwable != null) {
512 shutdownShardAsk.failure(throwable);
514 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
517 }, context.getClientDispatcher());
519 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
521 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
522 if (throwable != null) {
523 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
524 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
525 rpcResult.set(failedResult);
527 // according to Patterns.gracefulStop API, we don't have to
528 // check value of gracefulStopResult
529 rpcResult.set(RpcResultBuilder.success(success).build());
532 }, context.getClientDispatcher());
537 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
538 LOG.info("In registerConstant - input: {}", input);
540 if (input.getConstant() == null) {
541 return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
542 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
545 if (globalGetConstantRegistration != null) {
546 return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
547 "data-exists", "There is already an rpc registered").buildFuture();
550 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
551 return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
555 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
556 final UnregisterDefaultConstantInput input) {
557 throw new UnsupportedOperationException();
561 @SuppressWarnings("checkstyle:IllegalCatch")
562 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
563 throw new UnsupportedOperationException();