2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
12 import static org.opendaylight.yangtools.yang.common.RpcResultBuilder.newError;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.Patterns;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Strings;
22 import com.google.common.util.concurrent.CheckedFuture;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.SettableFuture;
26 import java.io.PrintWriter;
27 import java.io.StringWriter;
28 import java.util.Collections;
29 import java.util.HashMap;
31 import java.util.Objects;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import org.opendaylight.controller.cluster.ActorSystemProvider;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
38 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
39 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
42 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
43 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
44 import org.opendaylight.controller.clustering.it.provider.impl.DataListenerState;
45 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
46 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
47 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
48 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
49 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
50 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
51 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
53 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
54 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
55 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
56 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
57 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
58 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
59 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
60 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
62 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
63 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
64 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
65 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
66 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
67 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
68 import org.opendaylight.controller.sal.core.api.model.SchemaService;
69 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
74 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
75 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
103 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
104 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
105 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
106 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
107 import org.opendaylight.yangtools.concepts.ListenerRegistration;
108 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
109 import org.opendaylight.yangtools.yang.common.RpcError;
110 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
111 import org.opendaylight.yangtools.yang.common.RpcResult;
112 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
113 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
114 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
115 import org.slf4j.Logger;
116 import org.slf4j.LoggerFactory;
117 import scala.concurrent.duration.FiniteDuration;
119 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
121 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
122 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
123 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
125 private static final ListenableFuture<RpcResult<Void>> VOID_SUCCESS = success(null);
127 private final RpcProviderRegistry rpcRegistry;
128 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
129 private final DistributedShardFactory distributedShardFactory;
130 private final DistributedDataStoreInterface configDataStore;
131 private final DOMDataTreeService domDataTreeService;
132 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
133 private final DOMDataBroker domDataBroker;
134 private final NotificationPublishService notificationPublishService;
135 private final NotificationService notificationService;
136 private final SchemaService schemaService;
137 private final ClusterSingletonServiceProvider singletonService;
138 private final DOMRpcProviderService domRpcService;
139 private final PrefixLeaderHandler prefixLeaderHandler;
140 private final PrefixShardHandler prefixShardHandler;
141 private final DOMDataTreeChangeService domDataTreeChangeService;
142 private final ActorSystem actorSystem;
144 private final Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
147 private final Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
149 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
150 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
151 private FlappingSingletonService flappingSingletonService;
152 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
153 private IdIntsListener idIntsListener;
154 private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
155 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
156 private IdIntsDOMDataTreeLIstener idIntsDdtl;
160 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
161 final DOMRpcProviderService domRpcService,
162 final ClusterSingletonServiceProvider singletonService,
163 final SchemaService schemaService,
164 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
165 final NotificationPublishService notificationPublishService,
166 final NotificationService notificationService,
167 final DOMDataBroker domDataBroker,
168 final DOMDataTreeService domDataTreeService,
169 final DistributedShardFactory distributedShardFactory,
170 final DistributedDataStoreInterface configDataStore,
171 final ActorSystemProvider actorSystemProvider) {
172 this.rpcRegistry = rpcRegistry;
173 this.domRpcService = domRpcService;
174 this.singletonService = singletonService;
175 this.schemaService = schemaService;
176 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
177 this.notificationPublishService = notificationPublishService;
178 this.notificationService = notificationService;
179 this.domDataBroker = domDataBroker;
180 this.domDataTreeService = domDataTreeService;
181 this.distributedShardFactory = distributedShardFactory;
182 this.configDataStore = configDataStore;
183 this.actorSystem = actorSystemProvider.getActorSystem();
185 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
187 domDataTreeChangeService =
188 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
190 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
192 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
193 bindingNormalizedNodeSerializer);
197 public Future<RpcResult<Void>> unregisterSingletonConstant() {
198 LOG.debug("unregister-singleton-constant");
200 if (getSingletonConstantRegistration == null) {
201 LOG.debug("No get-singleton-constant registration present.");
202 return failure(newError(ErrorType.APPLICATION, "missing-registration",
203 "No get-singleton-constant rpc registration present."));
207 getSingletonConstantRegistration.close();
208 } catch (final Exception e) {
209 LOG.debug("There was a problem closing the singleton constant service", e);
210 return failure(newError(ErrorType.APPLICATION, "error-closing",
211 "There was a problem closing get-singleton-constant"));
213 getSingletonConstantRegistration = null;
220 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
221 LOG.debug("publish-notifications, input: {}", input);
223 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
224 input.getSeconds(), input.getNotificationsPerSecond());
226 publishNotificationsTasks.put(input.getId(), task);
234 public Future<RpcResult<Void>> subscribeDtcl() {
235 if (dtclReg != null) {
236 return failure(newError(ErrorType.RPC, "Registration present.",
237 "There is already dataTreeChangeListener registered on id-ints list."));
240 idIntsListener = new IdIntsListener();
242 dtclReg = domDataTreeChangeService
243 .registerDataTreeChangeListener(
244 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
245 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
248 LOG.debug("ClusteredDOMDataTreeChangeListener registered");
253 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
254 LOG.debug("write-transactions, input: {}", input);
255 return WriteTransactionsHandler.start(domDataBroker, input);
259 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
260 // FIXME: implement this
265 public Future<RpcResult<Void>> removeShardReplica(final RemoveShardReplicaInput input) {
266 // FIXME: implement this
271 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
272 LOG.debug("subscribe-ynl, input: {}", input);
274 if (ynlRegistrations.containsKey(input.getId())) {
275 return failure(newError(ErrorType.RPC, "Registration present.",
276 "There is already ynl listener registered for this id: " + input.getId()));
279 ynlRegistrations.put(input.getId(),
280 notificationService.registerNotificationListener(new YnlListener(input.getId())));
282 return success(null);
286 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
287 LOG.debug("remove-prefix-shard, input: {}", input);
289 return prefixShardHandler.onRemovePrefixShard(input);
293 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
294 LOG.debug("become-prefix-leader, input: {}", input);
296 return prefixLeaderHandler.makeLeaderLocal(input);
300 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
301 LOG.debug("unregister-bound-constant, {}", input);
303 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
304 routedRegistrations.remove(input.getContext());
306 if (registration == null) {
307 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
308 return failure(newError(ErrorType.APPLICATION, "missing-registration",
309 "No get-constant rpc registration present."));
312 registration.close();
317 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
318 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
320 if (input.getConstant() == null) {
321 return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
324 getSingletonConstantRegistration =
325 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
331 public Future<RpcResult<Void>> registerDefaultConstant(final RegisterDefaultConstantInput input) {
332 // FIXME: implement this
337 public Future<RpcResult<Void>> unregisterConstant() {
339 if (globalGetConstantRegistration == null) {
340 return failure(newError(ErrorType.APPLICATION, "missing-registration",
341 "No get-constant rpc registration present."));
344 globalGetConstantRegistration.close();
345 globalGetConstantRegistration = null;
351 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
352 LOG.debug("unregister-flapping-singleton received.");
354 if (flappingSingletonService == null) {
355 return failure(newError(ErrorType.APPLICATION, "missing-registration",
356 "No flapping-singleton registration present."));
359 final long flapCount = flappingSingletonService.setInactive();
360 flappingSingletonService = null;
362 return success(new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build());
366 public Future<RpcResult<Void>> addShardReplica(final AddShardReplicaInput input) {
367 // FIXME: implement this
372 public Future<RpcResult<Void>> subscribeDdtl() {
373 if (ddtlReg != null) {
374 return failure(newError(ErrorType.RPC, "Registration present.",
375 "There is already dataTreeChangeListener registered on id-ints list."));
378 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
381 ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
382 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
383 ProduceTransactionsHandler.ID_INT_YID)), true, Collections.emptyList());
384 } catch (DOMDataTreeLoopException e) {
385 LOG.error("Failed to register DOMDataTreeListener.", e);
386 return failure(newError(ErrorType.APPLICATION, "register-failed", e.getMessage()));
389 LOG.debug("DOMDataTreeListener registered");
394 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
395 LOG.debug("register-bound-constant: {}", input);
397 if (input.getContext() == null) {
398 return failure(newError(ErrorType.RPC, "Invalid input.", "Context value is null"));
401 if (input.getConstant() == null) {
402 return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
405 if (routedRegistrations.containsKey(input.getContext())) {
406 return failure(newError(ErrorType.RPC, "Registration present.",
407 "There is already a rpc registered for context: " + input.getContext()));
410 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
411 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
412 input.getConstant(), input.getContext());
414 routedRegistrations.put(input.getContext(), registration);
419 public Future<RpcResult<Void>> registerFlappingSingleton() {
420 LOG.debug("Received register-flapping-singleton.");
422 if (flappingSingletonService != null) {
423 return failure(RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
424 "flapping-singleton already registered"));
427 flappingSingletonService = new FlappingSingletonService(singletonService);
432 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
433 LOG.debug("Received unsubscribe-dtcl");
435 if (idIntsListener == null || dtclReg == null) {
436 return failure(newError(ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered."));
439 final ListenableFuture<DataListenerState> future = idIntsListener.tryFinishProcessing(dtclReg);
442 return Futures.withFallback(Futures.transform(future, this::dtclOutput),
443 t -> failure(newError(ErrorType.RPC, "resource-denied-transport", "Failed to finish processing",
444 "clustering-it", "clustering-it", t)));
447 private RpcResult<UnsubscribeDtclOutput> dtclOutput(final DataListenerState state) {
448 if (state.changeCount() == 0) {
449 return failed(newError(ErrorType.APPLICATION, "No notification received.",
450 "id-ints listener has not received any notifications."));
453 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
454 final Optional<NormalizedNode<?, ?>> readResult;
457 readResult = rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
458 } catch (final ReadFailedException e) {
459 return failed(newError(ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."));
464 final NormalizedNode<?, ?> expected = state.lastData().orNull();
465 final NormalizedNode<?, ?> actual = readResult.orNull();
466 final boolean equal = Objects.equals(expected, actual);
468 LOG.debug("Expected result {} read resulted in {}", expected, actual);
471 final RpcResultBuilder<UnsubscribeDtclOutput> b = RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
472 .setCopyMatches(equal).build());
474 // for (DataListenerViolation violation : state.violations()) {
475 // final Optional<NormalizedNodeDiff> diff = violation.toDiff();
476 // if (diff.isPresent()) {
477 // b.withWarning(ErrorType.APPLICATION, "Sequence mismatch", diff.get().toString());
485 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
486 LOG.debug("create-prefix-shard, input: {}", input);
488 return prefixShardHandler.onCreatePrefixShard(input);
492 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
493 // FIXME: implement this
498 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
499 LOG.debug("Received unsubscribe-ynl, input: {}", input);
501 if (!ynlRegistrations.containsKey(input.getId())) {
502 return failure(newError(ErrorType.APPLICATION, "missing-registration",
503 "No ynl listener with this id registered."));
506 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
507 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
509 registration.close();
511 return success(output);
515 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
516 final CheckPublishNotificationsInput input) {
518 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
520 return success(new CheckPublishNotificationsOutputBuilder().setActive(Boolean.FALSE).build());
523 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
524 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
526 if (task.getLastError() != null) {
527 final StringWriter sw = new StringWriter();
528 final PrintWriter pw = new PrintWriter(sw);
529 task.getLastError().printStackTrace(pw);
530 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
533 return success(checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build());
537 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
538 LOG.debug("producer-transactions, input: {}", input);
539 return ProduceTransactionsHandler.start(domDataTreeService, input);
543 public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
544 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
546 final String shardName = input.getShardName();
547 if (Strings.isNullOrEmpty(shardName)) {
548 return failure(newError(ErrorType.APPLICATION, "bad-element", "A valid shard name must be specified"));
551 return shutdownShardGracefully(shardName);
555 public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
556 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
558 final InstanceIdentifier<?> shardPrefix = input.getPrefix();
559 if (shardPrefix == null) {
560 return failure(newError(ErrorType.APPLICATION, "bad-element", "A valid shard prefix must be specified"));
563 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
564 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
566 return shutdownShardGracefully(cleanPrefixShardName);
569 private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
570 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
571 final ActorContext context = configDataStore.getActorContext();
573 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
574 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
575 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
576 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
578 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
580 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
581 if (throwable != null) {
582 shutdownShardAsk.failure(throwable);
584 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
587 }, context.getClientDispatcher());
589 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
591 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) {
592 if (throwable != null) {
593 rpcResult.set(RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION,
594 "Failed to gracefully shutdown shard", throwable).build());
596 // according to Patterns.gracefulStop API, we don't have to
597 // check value of gracefulStopResult
598 rpcResult.set(RpcResultBuilder.<Void>success().build());
601 }, context.getClientDispatcher());
606 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
608 LOG.debug("Received register-constant rpc, input: {}", input);
610 if (input.getConstant() == null) {
611 return failure(newError(ErrorType.RPC, "Invalid input.", "Constant value is null"));
614 if (globalGetConstantRegistration != null) {
615 return failure(newError(ErrorType.RPC, "Registration present.",
616 "There is already a get-constant rpc registered."));
619 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
624 public Future<RpcResult<Void>> unregisterDefaultConstant() {
625 // FIXME: implement this
630 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
631 LOG.debug("Received unsubscribe-ddtl.");
633 if (idIntsDdtl == null || ddtlReg == null) {
634 return failure(newError(ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered."));
637 final ListenableFuture<DataListenerState> future = idIntsDdtl.tryFinishProcessing(ddtlReg);
640 return Futures.withFallback(Futures.transform(future, this::ddtlOutput),
641 t -> failure(newError(ErrorType.RPC, "resource-denied-transport", "Failed to finish processing",
642 "clustering-it", "clustering-it", t)));
645 private RpcResult<UnsubscribeDdtlOutput> ddtlOutput(final DataListenerState state) {
646 if (state.changeCount() == 0) {
647 return failed(newError(ErrorType.APPLICATION, "No notification received.",
648 "id-ints listener has not received any notifications."));
651 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
652 LOG.debug("Creating distributed datastore client for shard {}", shardName);
654 final ActorContext actorContext = configDataStore.getActorContext();
655 final Props distributedDataStoreClientProps = SimpleDataStoreClientActor.props(
656 actorContext.getCurrentMemberName(), "Shard-" + shardName, actorContext, shardName);
658 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
659 final DataStoreClient distributedDataStoreClient;
661 distributedDataStoreClient = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30,
663 } catch (final Exception e) {
664 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
665 clientActor.tell(PoisonPill.getInstance(), noSender());
666 return failed(newError(ErrorType.APPLICATION, "Unable to create ds client for read.",
667 "Unable to create ds client for read."));
670 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
671 final ClientTransaction tx = localHistory.createTransaction();
672 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
673 org.opendaylight.mdsal.common.api.ReadFailedException> read =
674 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
677 localHistory.close();
679 final Optional<NormalizedNode<?, ?>> readResult;
681 readResult = read.checkedGet();
682 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
683 LOG.error("Unable to read data to verify ddtl data.", e);
684 return failed(newError( ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."));
686 distributedDataStoreClient.close();
687 clientActor.tell(PoisonPill.getInstance(), noSender());
691 final NormalizedNode<?, ?> expected = state.lastData().orNull();
692 final NormalizedNode<?, ?> actual = readResult.orNull();
693 final boolean equal = Objects.equals(expected, actual);
695 LOG.debug("Expected result {} read resulted in {}", expected, actual);
697 final RpcResultBuilder<UnsubscribeDdtlOutput> b = RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
698 .setCopyMatches(equal).build());
700 // for (DataListenerViolation violation : state.violations()) {
701 // final Optional<NormalizedNodeDiff> diff = violation.toDiff();
702 // if (diff.isPresent()) {
703 // b.withWarning(ErrorType.APPLICATION, "Sequence mismatch", diff.get().toString());
710 private static <T> RpcResult<T> failed(final RpcError error) {
711 return RpcResultBuilder.<T>failed().withRpcError(error).build();
714 private static <T> ListenableFuture<RpcResult<T>> failure(final RpcError error) {
715 return Futures.immediateFuture(failed(error));
718 private static <T> ListenableFuture<RpcResult<T>> success(final T result) {
719 return Futures.immediateFuture(RpcResultBuilder.success(result).build());