2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
40 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
41 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
42 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
44 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
49 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
51 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
61 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.controller.sal.core.api.model.SchemaService;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
70 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
71 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownPrefixShardReplicaInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
102 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
103 import org.opendaylight.yangtools.concepts.ListenerRegistration;
104 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
105 import org.opendaylight.yangtools.yang.common.RpcError;
106 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
107 import org.opendaylight.yangtools.yang.common.RpcResult;
108 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
111 import org.slf4j.Logger;
112 import org.slf4j.LoggerFactory;
113 import scala.concurrent.duration.FiniteDuration;
115 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
117 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
118 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
119 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
121 private final RpcProviderRegistry rpcRegistry;
122 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
123 private final DistributedShardFactory distributedShardFactory;
124 private final DistributedDataStoreInterface configDataStore;
125 private final DOMDataTreeService domDataTreeService;
126 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
127 private final DOMDataBroker domDataBroker;
128 private final NotificationPublishService notificationPublishService;
129 private final NotificationService notificationService;
130 private final SchemaService schemaService;
131 private final ClusterSingletonServiceProvider singletonService;
132 private final DOMRpcProviderService domRpcService;
133 private final PrefixLeaderHandler prefixLeaderHandler;
134 private final PrefixShardHandler prefixShardHandler;
135 private final DOMDataTreeChangeService domDataTreeChangeService;
136 private final ActorSystem actorSystem;
138 private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
141 private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
143 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
144 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
145 private FlappingSingletonService flappingSingletonService;
146 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
147 private IdIntsListener idIntsListener;
148 private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
149 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
150 private IdIntsDOMDataTreeLIstener idIntsDdtl;
154 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
155 final DOMRpcProviderService domRpcService,
156 final ClusterSingletonServiceProvider singletonService,
157 final SchemaService schemaService,
158 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
159 final NotificationPublishService notificationPublishService,
160 final NotificationService notificationService,
161 final DOMDataBroker domDataBroker,
162 final DOMDataTreeService domDataTreeService,
163 final DistributedShardFactory distributedShardFactory,
164 final DistributedDataStoreInterface configDataStore,
165 final ActorSystemProvider actorSystemProvider) {
166 this.rpcRegistry = rpcRegistry;
167 this.domRpcService = domRpcService;
168 this.singletonService = singletonService;
169 this.schemaService = schemaService;
170 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
171 this.notificationPublishService = notificationPublishService;
172 this.notificationService = notificationService;
173 this.domDataBroker = domDataBroker;
174 this.domDataTreeService = domDataTreeService;
175 this.distributedShardFactory = distributedShardFactory;
176 this.configDataStore = configDataStore;
177 this.actorSystem = actorSystemProvider.getActorSystem();
179 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
181 domDataTreeChangeService =
182 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
184 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
186 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
187 bindingNormalizedNodeSerializer);
191 public Future<RpcResult<Void>> unregisterSingletonConstant() {
192 LOG.debug("unregister-singleton-constant");
194 if (getSingletonConstantRegistration == null) {
195 LOG.debug("No get-singleton-constant registration present.");
196 final RpcError rpcError = RpcResultBuilder
197 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
198 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
199 return Futures.immediateFuture(result);
203 getSingletonConstantRegistration.close();
204 getSingletonConstantRegistration = null;
206 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
207 } catch (final Exception e) {
208 LOG.debug("There was a problem closing the singleton constant service", e);
209 final RpcError rpcError = RpcResultBuilder
210 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
211 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
212 return Futures.immediateFuture(result);
217 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
218 LOG.debug("publish-notifications, input: {}", input);
220 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
221 input.getSeconds(), input.getNotificationsPerSecond());
223 publishNotificationsTasks.put(input.getId(), task);
227 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
231 public Future<RpcResult<Void>> subscribeDtcl() {
233 if (dtclReg != null) {
234 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
235 "There is already dataTreeChangeListener registered on id-ints list.");
236 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
239 idIntsListener = new IdIntsListener();
241 dtclReg = domDataTreeChangeService
242 .registerDataTreeChangeListener(
243 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
244 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
247 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
251 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
252 LOG.debug("write-transactions, input: {}", input);
254 final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
256 final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
257 writeTransactionsHandler.start(settableFuture);
259 return settableFuture;
263 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
268 public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
273 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
275 LOG.debug("subscribe-ynl, input: {}", input);
277 if (ynlRegistrations.containsKey(input.getId())) {
278 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
279 "There is already ynl listener registered for this id: " + input.getId());
280 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
283 ynlRegistrations.put(input.getId(),
284 notificationService.registerNotificationListener(new YnlListener(input.getId())));
286 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
290 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
291 LOG.debug("remove-prefix-shard, input: {}", input);
293 return prefixShardHandler.onRemovePrefixShard(input);
297 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
298 LOG.debug("become-prefix-leader, input: {}", input);
300 return prefixLeaderHandler.makeLeaderLocal(input);
304 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
305 LOG.debug("unregister-bound-constant, {}", input);
307 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
308 routedRegistrations.remove(input.getContext());
310 if (registration == null) {
311 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
312 final RpcError rpcError = RpcResultBuilder
313 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
314 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
315 return Futures.immediateFuture(result);
318 registration.close();
319 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
323 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
325 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
327 if (input.getConstant() == null) {
328 final RpcError error = RpcResultBuilder.newError(
329 ErrorType.RPC, "Invalid input.", "Constant value is null");
330 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
333 getSingletonConstantRegistration =
334 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
336 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
340 public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
345 public Future<RpcResult<Void>> unregisterConstant() {
347 if (globalGetConstantRegistration == null) {
348 final RpcError rpcError = RpcResultBuilder
349 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
350 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
351 return Futures.immediateFuture(result);
354 globalGetConstantRegistration.close();
355 globalGetConstantRegistration = null;
357 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
361 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
362 LOG.debug("unregister-flapping-singleton received.");
364 if (flappingSingletonService == null) {
365 final RpcError rpcError = RpcResultBuilder
366 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
367 final RpcResult<UnregisterFlappingSingletonOutput> result =
368 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
369 return Futures.immediateFuture(result);
372 final long flapCount = flappingSingletonService.setInactive();
373 flappingSingletonService = null;
375 final UnregisterFlappingSingletonOutput output =
376 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
378 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
382 public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
387 public Future<RpcResult<Void>> subscribeDdtl() {
389 if (ddtlReg != null) {
390 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
391 "There is already dataTreeChangeListener registered on id-ints list.");
392 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
395 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
399 domDataTreeService.registerListener(idIntsDdtl,
400 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
401 ProduceTransactionsHandler.ID_INT_YID))
402 , true, Collections.emptyList());
403 } catch (DOMDataTreeLoopException e) {
404 LOG.error("Failed to register DOMDataTreeListener.", e);
408 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
412 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
413 LOG.debug("register-bound-constant: {}", input);
415 if (input.getContext() == null) {
416 final RpcError error = RpcResultBuilder.newError(
417 ErrorType.RPC, "Invalid input.", "Context value is null");
418 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
421 if (input.getConstant() == null) {
422 final RpcError error = RpcResultBuilder.newError(
423 ErrorType.RPC, "Invalid input.", "Constant value is null");
424 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
427 if (routedRegistrations.containsKey(input.getContext())) {
428 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
429 "There is already a rpc registered for context: " + input.getContext());
430 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
433 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
434 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
435 input.getConstant(), input.getContext());
437 routedRegistrations.put(input.getContext(), registration);
438 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
442 public Future<RpcResult<Void>> registerFlappingSingleton() {
443 LOG.debug("Received register-flapping-singleton.");
445 if (flappingSingletonService != null) {
446 final RpcError error = RpcResultBuilder.newError(
447 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
448 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
451 flappingSingletonService = new FlappingSingletonService(singletonService);
453 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
457 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
458 LOG.debug("Received unsubscribe-dtcl");
460 if (idIntsListener == null || dtclReg == null) {
461 final RpcError error = RpcResultBuilder.newError(
462 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
463 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
469 if (!idIntsListener.hasTriggered()) {
470 final RpcError error = RpcResultBuilder.newError(
471 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
472 "any notifications.");
473 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
474 .withRpcError(error).build());
477 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
479 final Optional<NormalizedNode<?, ?>> readResult =
480 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
482 if (!readResult.isPresent()) {
483 final RpcError error = RpcResultBuilder.newError(
484 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
485 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
486 .withRpcError(error).build());
489 return Futures.immediateFuture(
490 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
491 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
493 } catch (final ReadFailedException e) {
494 final RpcError error = RpcResultBuilder.newError(
495 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
496 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
497 .withRpcError(error).build());
503 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
504 LOG.debug("create-prefix-shard, input: {}", input);
506 return prefixShardHandler.onCreatePrefixShard(input);
510 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
515 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
516 LOG.debug("Received unsubscribe-ynl, input: {}", input);
518 if (!ynlRegistrations.containsKey(input.getId())) {
519 final RpcError rpcError = RpcResultBuilder
520 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
521 final RpcResult<UnsubscribeYnlOutput> result =
522 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
523 return Futures.immediateFuture(result);
526 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
527 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
529 registration.close();
531 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
535 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
536 final CheckPublishNotificationsInput input) {
538 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
541 return Futures.immediateFuture(RpcResultBuilder.success(
542 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
545 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
546 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
548 if (task.getLastError() != null) {
549 final StringWriter sw = new StringWriter();
550 final PrintWriter pw = new PrintWriter(sw);
551 task.getLastError().printStackTrace(pw);
552 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
555 final CheckPublishNotificationsOutput output =
556 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
558 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
562 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
563 LOG.debug("producer-transactions, input: {}", input);
565 final ProduceTransactionsHandler handler =
566 new ProduceTransactionsHandler(domDataTreeService, input);
568 final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
569 handler.start(settableFuture);
571 return settableFuture;
575 public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
576 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
578 final String shardName = input.getShardName();
579 if (Strings.isNullOrEmpty(shardName)) {
580 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
581 "A valid shard name must be specified");
582 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
585 return shutdownShardGracefully(shardName);
589 public Future<RpcResult<Void>> shutdownPrefixShardReplica(final ShutdownPrefixShardReplicaInput input) {
590 LOG.debug("Received shutdown-prefix-shard-replica rpc, input: {}", input);
592 final InstanceIdentifier shardPrefix = input.getPrefix();
594 if (shardPrefix == null) {
595 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
596 "A valid shard prefix must be specified");
597 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
600 final YangInstanceIdentifier shardPath = bindingNormalizedNodeSerializer.toYangInstanceIdentifier(shardPrefix);
601 final String cleanPrefixShardName = ClusterUtils.getCleanShardName(shardPath);
603 return shutdownShardGracefully(cleanPrefixShardName);
606 private SettableFuture<RpcResult<Void>> shutdownShardGracefully(final String shardName) {
607 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
608 final ActorContext context = configDataStore.getActorContext();
610 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
611 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
612 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
613 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
615 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
617 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
618 if (throwable != null) {
619 shutdownShardAsk.failure(throwable);
621 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
624 }, context.getClientDispatcher());
626 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
628 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
629 if (throwable != null) {
630 final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
631 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
632 rpcResult.set(failedResult);
634 // according to Patterns.gracefulStop API, we don't have to
635 // check value of gracefulStopResult
636 rpcResult.set(RpcResultBuilder.<Void>success().build());
639 }, context.getClientDispatcher());
644 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
646 LOG.debug("Received register-constant rpc, input: {}", input);
648 if (input.getConstant() == null) {
649 final RpcError error = RpcResultBuilder.newError(
650 ErrorType.RPC, "Invalid input.", "Constant value is null");
651 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
654 if (globalGetConstantRegistration != null) {
655 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
656 "There is already a get-constant rpc registered.");
657 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
660 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
661 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
665 public Future<RpcResult<Void>> unregisterDefaultConstant() {
670 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
671 LOG.debug("Received unsubscribe-ddtl.");
673 if (idIntsDdtl == null || ddtlReg == null) {
674 final RpcError error = RpcResultBuilder.newError(
675 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
676 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
682 if (!idIntsDdtl.hasTriggered()) {
683 final RpcError error = RpcResultBuilder.newError(
684 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
685 "any notifications.");
686 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
687 .withRpcError(error).build());
690 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
691 LOG.debug("Creating distributed datastore client for shard {}", shardName);
693 final ActorContext actorContext = configDataStore.getActorContext();
694 final Props distributedDataStoreClientProps =
695 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
696 "Shard-" + shardName, actorContext, shardName);
698 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
699 final DataStoreClient distributedDataStoreClient;
701 distributedDataStoreClient = SimpleDataStoreClientActor
702 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
703 } catch (final Exception e) {
704 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
705 clientActor.tell(PoisonPill.getInstance(), noSender());
706 final RpcError error = RpcResultBuilder.newError(
707 ErrorType.APPLICATION, "Unable to create ds client for read.",
708 "Unable to create ds client for read.");
709 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
710 .withRpcError(error).build());
713 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
714 final ClientTransaction tx = localHistory.createTransaction();
715 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
716 org.opendaylight.mdsal.common.api.ReadFailedException> read =
717 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
720 localHistory.close();
722 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
723 if (!optional.isPresent()) {
724 LOG.warn("Final read from client is empty.");
725 final RpcError error = RpcResultBuilder.newError(
726 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
727 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
728 .withRpcError(error).build());
731 return Futures.immediateFuture(
732 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
733 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
735 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
736 LOG.error("Unable to read data to verify ddtl data.", e);
737 final RpcError error = RpcResultBuilder.newError(
738 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
739 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
740 .withRpcError(error).build());
742 distributedDataStoreClient.close();
743 clientActor.tell(PoisonPill.getInstance(), noSender());