2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.Collections;
25 import java.util.HashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
40 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
41 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
42 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
44 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
49 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
51 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
61 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.controller.sal.core.api.model.SchemaService;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
70 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
71 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
103 import org.opendaylight.yangtools.concepts.ListenerRegistration;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.common.RpcError;
106 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
107 import org.opendaylight.yangtools.yang.common.RpcResult;
108 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
111 import org.slf4j.Logger;
112 import org.slf4j.LoggerFactory;
113 import scala.concurrent.duration.FiniteDuration;
115 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
117 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
118 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
119 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
121 private final RpcProviderRegistry rpcRegistry;
122 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
123 private final DistributedShardFactory distributedShardFactory;
124 private final DistributedDataStoreInterface configDataStore;
125 private final DOMDataTreeService domDataTreeService;
126 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
127 private final DOMDataBroker domDataBroker;
128 private final NotificationPublishService notificationPublishService;
129 private final NotificationService notificationService;
130 private final SchemaService schemaService;
131 private final ClusterSingletonServiceProvider singletonService;
132 private final DOMRpcProviderService domRpcService;
133 private final PrefixLeaderHandler prefixLeaderHandler;
134 private final PrefixShardHandler prefixShardHandler;
135 private final DOMDataTreeChangeService domDataTreeChangeService;
136 private final ActorSystem actorSystem;
138 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>>
139 routedRegistrations = new HashMap<>();
141 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
143 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
144 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
145 private FlappingSingletonService flappingSingletonService;
146 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
147 private IdIntsListener idIntsListener;
148 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
149 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
150 private IdIntsDOMDataTreeLIstener idIntsDdtl;
154 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
155 final DOMRpcProviderService domRpcService,
156 final ClusterSingletonServiceProvider singletonService,
157 final SchemaService schemaService,
158 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
159 final NotificationPublishService notificationPublishService,
160 final NotificationService notificationService,
161 final DOMDataBroker domDataBroker,
162 final DOMDataTreeService domDataTreeService,
163 final DistributedShardFactory distributedShardFactory,
164 final DistributedDataStoreInterface configDataStore,
165 final ActorSystemProvider actorSystemProvider) {
166 this.rpcRegistry = rpcRegistry;
167 this.domRpcService = domRpcService;
168 this.singletonService = singletonService;
169 this.schemaService = schemaService;
170 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
171 this.notificationPublishService = notificationPublishService;
172 this.notificationService = notificationService;
173 this.domDataBroker = domDataBroker;
174 this.domDataTreeService = domDataTreeService;
175 this.distributedShardFactory = distributedShardFactory;
176 this.configDataStore = configDataStore;
177 this.actorSystem = actorSystemProvider.getActorSystem();
179 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
181 domDataTreeChangeService =
182 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
184 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
186 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
187 bindingNormalizedNodeSerializer);
191 @SuppressWarnings("checkstyle:IllegalCatch")
192 public Future<RpcResult<Void>> unregisterSingletonConstant() {
193 LOG.debug("unregister-singleton-constant");
195 if (getSingletonConstantRegistration == null) {
196 LOG.debug("No get-singleton-constant registration present.");
197 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
198 "No get-singleton-constant rpc registration present.");
199 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
200 return Futures.immediateFuture(result);
204 getSingletonConstantRegistration.close();
205 getSingletonConstantRegistration = null;
207 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
208 } catch (Exception e) {
209 LOG.debug("There was a problem closing the singleton constant service", e);
210 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "error-closing",
211 "There was a problem closing get-singleton-constant");
212 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
213 return Futures.immediateFuture(result);
218 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
219 LOG.debug("publish-notifications, input: {}", input);
221 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
222 input.getSeconds(), input.getNotificationsPerSecond());
224 publishNotificationsTasks.put(input.getId(), task);
228 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
232 public Future<RpcResult<Void>> subscribeDtcl() {
234 if (dtclReg != null) {
235 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
236 "There is already dataTreeChangeListener registered on id-ints list.");
237 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
240 idIntsListener = new IdIntsListener();
242 dtclReg = domDataTreeChangeService
243 .registerDataTreeChangeListener(
244 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
245 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
248 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
252 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
253 LOG.debug("write-transactions, input: {}", input);
254 return WriteTransactionsHandler.start(domDataBroker, input);
258 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
263 public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
268 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
270 LOG.debug("subscribe-ynl, input: {}", input);
272 if (ynlRegistrations.containsKey(input.getId())) {
273 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
274 "There is already ynl listener registered for this id: " + input.getId());
275 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
278 ynlRegistrations.put(input.getId(),
279 notificationService.registerNotificationListener(new YnlListener(input.getId())));
281 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
285 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
286 LOG.debug("remove-prefix-shard, input: {}", input);
288 return prefixShardHandler.onRemovePrefixShard(input);
292 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
293 LOG.debug("become-prefix-leader, input: {}", input);
295 return prefixLeaderHandler.makeLeaderLocal(input);
299 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
300 LOG.debug("unregister-bound-constant, {}", input);
302 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
303 routedRegistrations.remove(input.getContext());
305 if (rpcRegistration == null) {
306 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
307 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
308 "No get-constant rpc registration present.");
309 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
310 return Futures.immediateFuture(result);
313 rpcRegistration.close();
314 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
318 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
320 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
322 if (input.getConstant() == null) {
323 final RpcError error = RpcResultBuilder.newError(
324 ErrorType.RPC, "Invalid input.", "Constant value is null");
325 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
328 getSingletonConstantRegistration =
329 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
331 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
335 public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
340 public Future<RpcResult<Void>> unregisterConstant() {
342 if (globalGetConstantRegistration == null) {
343 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
344 "No get-constant rpc registration present.");
345 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
346 return Futures.immediateFuture(result);
349 globalGetConstantRegistration.close();
350 globalGetConstantRegistration = null;
352 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
356 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
357 LOG.debug("unregister-flapping-singleton received.");
359 if (flappingSingletonService == null) {
360 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
361 "No flapping-singleton registration present.");
362 final RpcResult<UnregisterFlappingSingletonOutput> result =
363 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
364 return Futures.immediateFuture(result);
367 final long flapCount = flappingSingletonService.setInactive();
368 flappingSingletonService = null;
370 final UnregisterFlappingSingletonOutput output =
371 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
373 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
377 public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
382 public Future<RpcResult<Void>> subscribeDdtl() {
384 if (ddtlReg != null) {
385 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
386 "There is already dataTreeChangeListener registered on id-ints list.");
387 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
390 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
393 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
394 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
395 ProduceTransactionsHandler.ID_INT_YID)),
396 true, Collections.emptyList());
397 } catch (DOMDataTreeLoopException e) {
398 LOG.error("Failed to register DOMDataTreeListener.", e);
402 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
406 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
407 LOG.debug("register-bound-constant: {}", input);
409 if (input.getContext() == null) {
410 final RpcError error = RpcResultBuilder.newError(
411 ErrorType.RPC, "Invalid input.", "Context value is null");
412 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
415 if (input.getConstant() == null) {
416 final RpcError error = RpcResultBuilder.newError(
417 ErrorType.RPC, "Invalid input.", "Constant value is null");
418 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
421 if (routedRegistrations.containsKey(input.getContext())) {
422 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
423 "There is already a rpc registered for context: " + input.getContext());
424 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
427 final DOMRpcImplementationRegistration<RoutedGetConstantService> rpcRegistration =
428 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
429 input.getConstant(), input.getContext());
431 routedRegistrations.put(input.getContext(), rpcRegistration);
432 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
436 public Future<RpcResult<Void>> registerFlappingSingleton() {
437 LOG.debug("Received register-flapping-singleton.");
439 if (flappingSingletonService != null) {
440 final RpcError error = RpcResultBuilder.newError(
441 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
442 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
445 flappingSingletonService = new FlappingSingletonService(singletonService);
447 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
451 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
452 LOG.debug("Received unsubscribe-dtcl");
454 if (idIntsListener == null || dtclReg == null) {
455 final RpcError error = RpcResultBuilder.newError(
456 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
457 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
458 .withRpcError(error).build());
462 idIntsListener.tryFinishProcessing().get(120, TimeUnit.SECONDS);
463 } catch (InterruptedException | ExecutionException | TimeoutException e) {
464 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
465 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
466 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
467 .withRpcError(error).build());
473 if (!idIntsListener.hasTriggered()) {
474 final RpcError error = RpcResultBuilder.newError(
475 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
476 + "any notifications.");
477 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
478 .withRpcError(error).build());
481 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
483 final Optional<NormalizedNode<?, ?>> readResult =
484 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
486 if (!readResult.isPresent()) {
487 final RpcError error = RpcResultBuilder.newError(
488 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
489 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
490 .withRpcError(error).build());
493 return Futures.immediateFuture(
494 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
495 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
497 } catch (final ReadFailedException e) {
498 final RpcError error = RpcResultBuilder.newError(
499 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
500 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
501 .withRpcError(error).build());
507 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
508 LOG.debug("create-prefix-shard, input: {}", input);
510 return prefixShardHandler.onCreatePrefixShard(input);
514 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
519 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
520 LOG.debug("Received unsubscribe-ynl, input: {}", input);
522 if (!ynlRegistrations.containsKey(input.getId())) {
523 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "missing-registration",
524 "No ynl listener with this id registered.");
525 final RpcResult<UnsubscribeYnlOutput> result =
526 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
527 return Futures.immediateFuture(result);
530 final ListenerRegistration<YnlListener> reg = ynlRegistrations.remove(input.getId());
531 final UnsubscribeYnlOutput output = reg.getInstance().getOutput();
535 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
539 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
540 final CheckPublishNotificationsInput input) {
542 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
545 return Futures.immediateFuture(RpcResultBuilder.success(
546 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
549 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
550 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
552 if (task.getLastError() != null) {
553 LOG.error("Last error for {}", task, task.getLastError());
554 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString());
557 final CheckPublishNotificationsOutput output =
558 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
560 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
564 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
565 LOG.debug("producer-transactions, input: {}", input);
566 return ProduceTransactionsHandler.start(domDataTreeService, input);
570 public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
571 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
573 final String shardName = input.getShardName();
574 if (Strings.isNullOrEmpty(shardName)) {
575 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
576 "A valid shard name must be specified");
577 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
580 return shutdownShardGracefully(shardName);
584 public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
585 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
587 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
589 if (shardPrefix == null) {
590 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
591 "A valid shard prefix must be specified");
592 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
595 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
596 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
598 return shutdownShardGracefully(cleanPrefixShardName);
601 private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
602 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
603 final ActorContext context = configDataStore.getActorContext();
605 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
606 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
607 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
608 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
610 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
612 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
613 if (throwable != null) {
614 shutdownShardAsk.failure(throwable);
616 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
619 }, context.getClientDispatcher());
621 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
623 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
624 if (throwable != null) {
625 final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
626 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
627 rpcResult.set(failedResult);
629 // according to Patterns.gracefulStop API, we don't have to
630 // check value of gracefulStopResult
631 rpcResult.set(RpcResultBuilder.<Void>success().build());
634 }, context.getClientDispatcher());
639 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
641 LOG.debug("Received register-constant rpc, input: {}", input);
643 if (input.getConstant() == null) {
644 final RpcError error = RpcResultBuilder.newError(
645 ErrorType.RPC, "Invalid input.", "Constant value is null");
646 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
649 if (globalGetConstantRegistration != null) {
650 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
651 "There is already a get-constant rpc registered.");
652 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
655 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
656 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
660 public Future<RpcResult<Void>> unregisterDefaultConstant() {
665 @SuppressWarnings("checkstyle:IllegalCatch")
666 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
667 LOG.debug("Received unsubscribe-ddtl.");
669 if (idIntsDdtl == null || ddtlReg == null) {
670 final RpcError error = RpcResultBuilder.newError(
671 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
672 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
673 .withRpcError(error).build());
677 idIntsDdtl.tryFinishProcessing().get(120, TimeUnit.SECONDS);
678 } catch (InterruptedException | ExecutionException | TimeoutException e) {
679 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "resource-denied-transport",
680 "Unable to finish notification processing in 120 seconds.", "clustering-it", "clustering-it", e);
681 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
682 .withRpcError(error).build());
688 if (!idIntsDdtl.hasTriggered()) {
689 final RpcError error = RpcResultBuilder.newError(
690 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received"
691 + "any notifications.");
692 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
693 .withRpcError(error).build());
696 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
697 LOG.debug("Creating distributed datastore client for shard {}", shardName);
699 final ActorContext actorContext = configDataStore.getActorContext();
700 final Props distributedDataStoreClientProps =
701 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
702 "Shard-" + shardName, actorContext, shardName);
704 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
705 final DataStoreClient distributedDataStoreClient;
707 distributedDataStoreClient = SimpleDataStoreClientActor
708 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
709 } catch (RuntimeException e) {
710 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
711 clientActor.tell(PoisonPill.getInstance(), noSender());
712 final RpcError error = RpcResultBuilder.newError(
713 ErrorType.APPLICATION, "Unable to create ds client for read.",
714 "Unable to create ds client for read.");
715 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
716 .withRpcError(error).build());
719 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
720 final ClientTransaction tx = localHistory.createTransaction();
721 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
722 org.opendaylight.mdsal.common.api.ReadFailedException> read =
723 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
726 localHistory.close();
728 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
729 if (!optional.isPresent()) {
730 LOG.warn("Final read from client is empty.");
731 final RpcError error = RpcResultBuilder.newError(
732 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
733 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
734 .withRpcError(error).build());
737 return Futures.immediateFuture(
738 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
739 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
741 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
742 LOG.error("Unable to read data to verify ddtl data.", e);
743 final RpcError error = RpcResultBuilder.newError(
744 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
745 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
746 .withRpcError(error).build());
748 distributedDataStoreClient.close();
749 clientActor.tell(PoisonPill.getInstance(), noSender());