2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.clustering.it.provider;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSystem;
12 import akka.dispatch.OnComplete;
13 import akka.pattern.Patterns;
14 import com.google.common.base.Strings;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.HashMap;
20 import java.util.Optional;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import org.opendaylight.controller.cluster.ActorSystemProvider;
25 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
27 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
28 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
29 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
30 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
31 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
32 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
33 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
34 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
35 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
36 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
37 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
38 import org.opendaylight.mdsal.binding.api.NotificationService;
39 import org.opendaylight.mdsal.binding.api.RpcProviderService;
40 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
41 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
42 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
44 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
47 import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
48 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
49 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaOutput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardInput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.DeconfigureIdIntsShardOutput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantOutputBuilder;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantOutputBuilder;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterFlappingSingletonOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantOutputBuilder;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaOutputBuilder;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlOutputBuilder;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantOutputBuilder;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantInput;
107 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutput;
108 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterConstantOutputBuilder;
109 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantInput;
110 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterDefaultConstantOutput;
111 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonInput;
112 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
113 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
114 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantInput;
115 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutput;
116 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
117 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
118 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
119 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
120 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
121 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
122 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
123 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
124 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
125 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
126 import org.opendaylight.yangtools.concepts.ListenerRegistration;
127 import org.opendaylight.yangtools.concepts.ObjectRegistration;
128 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
129 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
130 import org.opendaylight.yangtools.yang.common.RpcResult;
131 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
132 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
133 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
134 import org.slf4j.Logger;
135 import org.slf4j.LoggerFactory;
136 import scala.concurrent.duration.FiniteDuration;
138 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
139 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
141 private final RpcProviderService rpcRegistry;
142 private final ObjectRegistration<OdlMdsalLowlevelControlService> registration;
143 private final DistributedDataStoreInterface configDataStore;
144 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
145 private final DOMDataBroker domDataBroker;
146 private final NotificationPublishService notificationPublishService;
147 private final NotificationService notificationService;
148 private final DOMSchemaService schemaService;
149 private final ClusterSingletonServiceProvider singletonService;
150 private final DOMRpcProviderService domRpcService;
151 private final DOMDataTreeChangeService domDataTreeChangeService;
152 private final ActorSystem actorSystem;
154 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
155 routedRegistrations = new HashMap<>();
157 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
159 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
160 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
161 private FlappingSingletonService flappingSingletonService;
162 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
163 private IdIntsListener idIntsListener;
164 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
166 public MdsalLowLevelTestProvider(final RpcProviderService rpcRegistry,
167 final DOMRpcProviderService domRpcService,
168 final ClusterSingletonServiceProvider singletonService,
169 final DOMSchemaService schemaService,
170 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
171 final NotificationPublishService notificationPublishService,
172 final NotificationService notificationService,
173 final DOMDataBroker domDataBroker,
174 final DistributedDataStoreInterface configDataStore,
175 final ActorSystemProvider actorSystemProvider) {
176 this.rpcRegistry = rpcRegistry;
177 this.domRpcService = domRpcService;
178 this.singletonService = singletonService;
179 this.schemaService = schemaService;
180 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
181 this.notificationPublishService = notificationPublishService;
182 this.notificationService = notificationService;
183 this.domDataBroker = domDataBroker;
184 this.configDataStore = configDataStore;
185 this.actorSystem = actorSystemProvider.getActorSystem();
187 domDataTreeChangeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
189 registration = rpcRegistry.registerRpcImplementation(OdlMdsalLowlevelControlService.class, this);
193 @SuppressWarnings("checkstyle:IllegalCatch")
194 public ListenableFuture<RpcResult<UnregisterSingletonConstantOutput>> unregisterSingletonConstant(
195 final UnregisterSingletonConstantInput input) {
196 LOG.info("In unregisterSingletonConstant");
198 if (getSingletonConstantRegistration == null) {
199 return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(ErrorType.RPC, "data-missing",
200 "No prior RPC was registered").buildFuture();
204 getSingletonConstantRegistration.close();
205 getSingletonConstantRegistration = null;
207 return RpcResultBuilder.success(new UnregisterSingletonConstantOutputBuilder().build()).buildFuture();
208 } catch (Exception e) {
209 String msg = "Error closing the singleton constant service";
211 return RpcResultBuilder.<UnregisterSingletonConstantOutput>failed().withError(
212 ErrorType.APPLICATION, msg, e).buildFuture();
217 public ListenableFuture<RpcResult<StartPublishNotificationsOutput>> startPublishNotifications(
218 final StartPublishNotificationsInput input) {
219 LOG.info("In startPublishNotifications - input: {}", input);
221 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
222 input.getSeconds().toJava(), input.getNotificationsPerSecond().toJava());
224 publishNotificationsTasks.put(input.getId(), task);
228 return RpcResultBuilder.success(new StartPublishNotificationsOutputBuilder().build()).buildFuture();
232 public ListenableFuture<RpcResult<SubscribeDtclOutput>> subscribeDtcl(final SubscribeDtclInput input) {
233 LOG.info("In subscribeDtcl - input: {}", input);
235 if (dtclReg != null) {
236 return RpcResultBuilder.<SubscribeDtclOutput>failed().withError(ErrorType.RPC,
237 "data-exists", "There is already a DataTreeChangeListener registered for id-ints").buildFuture();
240 idIntsListener = new IdIntsListener();
242 dtclReg = domDataTreeChangeService.registerDataTreeChangeListener(
243 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, WriteTransactionsHandler.ID_INT_YID),
246 return RpcResultBuilder.success(new SubscribeDtclOutputBuilder().build()).buildFuture();
250 public ListenableFuture<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
251 return WriteTransactionsHandler.start(domDataBroker, input);
255 public ListenableFuture<RpcResult<IsClientAbortedOutput>> isClientAborted(final IsClientAbortedInput input) {
260 public ListenableFuture<RpcResult<RemoveShardReplicaOutput>> removeShardReplica(
261 final RemoveShardReplicaInput input) {
266 public ListenableFuture<RpcResult<SubscribeYnlOutput>> subscribeYnl(final SubscribeYnlInput input) {
267 LOG.info("In subscribeYnl - input: {}", input);
269 if (ynlRegistrations.containsKey(input.getId())) {
270 return RpcResultBuilder.<SubscribeYnlOutput>failed().withError(ErrorType.RPC,
271 "data-exists", "There is already a listener registered for id: " + input.getId()).buildFuture();
274 ynlRegistrations.put(input.getId(),
275 notificationService.registerNotificationListener(new YnlListener(input.getId())));
277 return RpcResultBuilder.success(new SubscribeYnlOutputBuilder().build()).buildFuture();
281 public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
282 throw new UnsupportedOperationException();
286 public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
287 final BecomePrefixLeaderInput input) {
288 throw new UnsupportedOperationException();
292 public ListenableFuture<RpcResult<UnregisterBoundConstantOutput>> unregisterBoundConstant(
293 final UnregisterBoundConstantInput input) {
294 LOG.info("In unregisterBoundConstant - {}", input);
296 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
297 routedRegistrations.remove(input.getContext());
299 if (rpcRegistration == null) {
300 return RpcResultBuilder.<UnregisterBoundConstantOutput>failed().withError(
301 ErrorType.RPC, "data-missing", "No prior RPC was registered for " + input.getContext()).buildFuture();
304 rpcRegistration.close();
305 return RpcResultBuilder.success(new UnregisterBoundConstantOutputBuilder().build()).buildFuture();
309 public ListenableFuture<RpcResult<RegisterSingletonConstantOutput>> registerSingletonConstant(
310 final RegisterSingletonConstantInput input) {
311 LOG.info("In registerSingletonConstant - input: {}", input);
313 if (input.getConstant() == null) {
314 return RpcResultBuilder.<RegisterSingletonConstantOutput>failed().withError(
315 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
318 getSingletonConstantRegistration =
319 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
321 return RpcResultBuilder.success(new RegisterSingletonConstantOutputBuilder().build()).buildFuture();
325 public ListenableFuture<RpcResult<RegisterDefaultConstantOutput>> registerDefaultConstant(
326 final RegisterDefaultConstantInput input) {
331 public ListenableFuture<RpcResult<UnregisterConstantOutput>> unregisterConstant(
332 final UnregisterConstantInput input) {
333 LOG.info("In unregisterConstant");
335 if (globalGetConstantRegistration == null) {
336 return RpcResultBuilder.<UnregisterConstantOutput>failed().withError(
337 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
340 globalGetConstantRegistration.close();
341 globalGetConstantRegistration = null;
343 return Futures.immediateFuture(RpcResultBuilder.success(new UnregisterConstantOutputBuilder().build()).build());
347 public ListenableFuture<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton(
348 final UnregisterFlappingSingletonInput input) {
349 LOG.info("In unregisterFlappingSingleton");
351 if (flappingSingletonService == null) {
352 return RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withError(
353 ErrorType.RPC, "data-missing", "No prior RPC was registered").buildFuture();
356 final long flapCount = flappingSingletonService.setInactive();
357 flappingSingletonService = null;
359 return RpcResultBuilder.success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build())
364 public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
365 throw new UnsupportedOperationException();
369 public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
370 throw new UnsupportedOperationException();
374 public ListenableFuture<RpcResult<RegisterBoundConstantOutput>> registerBoundConstant(
375 final RegisterBoundConstantInput input) {
376 LOG.info("In registerBoundConstant - input: {}", input);
378 if (input.getContext() == null) {
379 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
380 ErrorType.RPC, "invalid-value", "Context value is null").buildFuture();
383 if (input.getConstant() == null) {
384 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(
385 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
388 if (routedRegistrations.containsKey(input.getContext())) {
389 return RpcResultBuilder.<RegisterBoundConstantOutput>failed().withError(ErrorType.RPC,
390 "data-exists", "There is already an rpc registered for context: " + input.getContext()).buildFuture();
393 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
394 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
395 input.getConstant(), input.getContext());
397 routedRegistrations.put(input.getContext(), rpcRegistration);
398 return RpcResultBuilder.success(new RegisterBoundConstantOutputBuilder().build()).buildFuture();
402 public ListenableFuture<RpcResult<RegisterFlappingSingletonOutput>> registerFlappingSingleton(
403 final RegisterFlappingSingletonInput input) {
404 LOG.info("In registerFlappingSingleton");
406 if (flappingSingletonService != null) {
407 return RpcResultBuilder.<RegisterFlappingSingletonOutput>failed().withError(ErrorType.RPC,
408 "data-exists", "There is already an rpc registered").buildFuture();
411 flappingSingletonService = new FlappingSingletonService(singletonService);
413 return RpcResultBuilder.success(new RegisterFlappingSingletonOutputBuilder().build()).buildFuture();
417 public ListenableFuture<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl(final UnsubscribeDtclInput input) {
418 LOG.info("In unsubscribeDtcl");
420 if (idIntsListener == null || dtclReg == null) {
421 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(
422 ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
427 idIntsListener.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
428 } catch (InterruptedException | ExecutionException | TimeoutException e) {
429 LOG.error("Unable to finish notification processing", e);
430 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
431 "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
437 if (!idIntsListener.hasTriggered()) {
438 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "operation-failed",
439 "id-ints listener has not received any notifications.").buildFuture();
442 try (DOMDataTreeReadTransaction rTx = domDataBroker.newReadOnlyTransaction()) {
443 final Optional<NormalizedNode> readResult = rTx.read(LogicalDatastoreType.CONFIGURATION,
444 WriteTransactionsHandler.ID_INT_YID).get();
446 if (!readResult.isPresent()) {
447 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION, "data-missing",
448 "No data read from id-ints list").buildFuture();
451 final boolean nodesEqual = idIntsListener.checkEqual(readResult.get());
453 LOG.error("Final read of id-int does not match IdIntsListener's copy. {}",
454 idIntsListener.diffWithLocalCopy(readResult.get()));
457 return RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder().setCopyMatches(nodesEqual))
460 } catch (final InterruptedException | ExecutionException e) {
461 LOG.error("Final read of id-ints failed", e);
462 return RpcResultBuilder.<UnsubscribeDtclOutput>failed().withError(ErrorType.APPLICATION,
463 "Final read of id-ints failed", e).buildFuture();
468 public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
469 throw new UnsupportedOperationException();
473 public ListenableFuture<RpcResult<DeconfigureIdIntsShardOutput>> deconfigureIdIntsShard(
474 final DeconfigureIdIntsShardInput input) {
479 public ListenableFuture<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
480 LOG.info("In unsubscribeYnl - input: {}", input);
482 if (!ynlRegistrations.containsKey(input.getId())) {
483 return RpcResultBuilder.<UnsubscribeYnlOutput>failed().withError(
484 ErrorType.RPC, "data-missing", "No prior listener was registered for " + input.getId()).buildFuture();
487 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
488 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
492 return RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).buildFuture();
496 public ListenableFuture<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
497 final CheckPublishNotificationsInput input) {
498 LOG.info("In checkPublishNotifications - input: {}", input);
500 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
503 return Futures.immediateFuture(RpcResultBuilder.success(
504 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
507 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
508 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
510 if (task.getLastError() != null) {
511 LOG.error("Last error for {}", task, task.getLastError());
512 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
515 final CheckPublishNotificationsOutput output =
516 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
518 return RpcResultBuilder.success(output).buildFuture();
522 public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
523 final ProduceTransactionsInput input) {
524 throw new UnsupportedOperationException();
528 public ListenableFuture<RpcResult<ShutdownShardReplicaOutput>> shutdownShardReplica(
529 final ShutdownShardReplicaInput input) {
530 LOG.info("In shutdownShardReplica - input: {}", input);
532 final String shardName = input.getShardName();
533 if (Strings.isNullOrEmpty(shardName)) {
534 return RpcResultBuilder.<ShutdownShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
535 shardName + "is not a valid shard name").buildFuture();
538 return shutdownShardGracefully(shardName, new ShutdownShardReplicaOutputBuilder().build());
542 public ListenableFuture<RpcResult<ShutdownPrefixShardReplicaOutput>> shutdownPrefixShardReplica(
543 final ShutdownPrefixShardReplicaInput input) {
544 LOG.info("shutdownPrefixShardReplica - input: {}", input);
546 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
548 if (shardPrefix == null) {
549 return RpcResultBuilder.<ShutdownPrefixShardReplicaOutput>failed().withError(ErrorType.RPC, "bad-element",
550 "A valid shard prefix must be specified").buildFuture();
553 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
554 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
556 return shutdownShardGracefully(cleanPrefixShardName, new ShutdownPrefixShardReplicaOutputBuilder().build());
559 private <T> SettableFuture<RpcResult<T>> shutdownShardGracefully(final String shardName, final T success) {
560 final SettableFuture<RpcResult<T>> rpcResult = SettableFuture.create();
561 final ActorUtils context = configDataStore.getActorUtils();
563 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
564 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
565 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
566 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
568 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
570 public void onComplete(final Throwable throwable, final ActorRef actorRef) {
571 if (throwable != null) {
572 shutdownShardAsk.failure(throwable);
574 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
577 }, context.getClientDispatcher());
579 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
581 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
582 if (throwable != null) {
583 final RpcResult<T> failedResult = RpcResultBuilder.<T>failed()
584 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
585 rpcResult.set(failedResult);
587 // according to Patterns.gracefulStop API, we don't have to
588 // check value of gracefulStopResult
589 rpcResult.set(RpcResultBuilder.success(success).build());
592 }, context.getClientDispatcher());
597 public ListenableFuture<RpcResult<RegisterConstantOutput>> registerConstant(final RegisterConstantInput input) {
598 LOG.info("In registerConstant - input: {}", input);
600 if (input.getConstant() == null) {
601 return RpcResultBuilder.<RegisterConstantOutput>failed().withError(
602 ErrorType.RPC, "invalid-value", "Constant value is null").buildFuture();
605 if (globalGetConstantRegistration != null) {
606 return RpcResultBuilder.<RegisterConstantOutput>failed().withError(ErrorType.RPC,
607 "data-exists", "There is already an rpc registered").buildFuture();
610 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
611 return RpcResultBuilder.success(new RegisterConstantOutputBuilder().build()).buildFuture();
615 public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
616 final UnregisterDefaultConstantInput input) {
617 throw new UnsupportedOperationException();
621 @SuppressWarnings("checkstyle:IllegalCatch")
622 public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
623 throw new UnsupportedOperationException();