2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.Patterns;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Strings;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.Collections;
27 import java.util.HashMap;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.TimeUnit;
31 import org.opendaylight.controller.cluster.ActorSystemProvider;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
33 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
34 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
36 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
39 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
40 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
41 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
42 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
43 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
44 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
45 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
46 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
47 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
49 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
50 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
51 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
52 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
53 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
57 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
58 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
61 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.controller.sal.core.api.model.SchemaService;
65 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
70 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
71 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ShutdownShardReplicaInput;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
97 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
98 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
99 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
100 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
101 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
102 import org.opendaylight.yangtools.concepts.ListenerRegistration;
103 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
104 import org.opendaylight.yangtools.yang.common.RpcError;
105 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
106 import org.opendaylight.yangtools.yang.common.RpcResult;
107 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
110 import org.slf4j.Logger;
111 import org.slf4j.LoggerFactory;
112 import scala.concurrent.duration.FiniteDuration;
114 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
116 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
117 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
118 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
120 private final RpcProviderRegistry rpcRegistry;
121 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
122 private final DistributedShardFactory distributedShardFactory;
123 private final DistributedDataStoreInterface configDataStore;
124 private final DOMDataTreeService domDataTreeService;
125 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
126 private final DOMDataBroker domDataBroker;
127 private final NotificationPublishService notificationPublishService;
128 private final NotificationService notificationService;
129 private final SchemaService schemaService;
130 private final ClusterSingletonServiceProvider singletonService;
131 private final DOMRpcProviderService domRpcService;
132 private final PrefixLeaderHandler prefixLeaderHandler;
133 private final PrefixShardHandler prefixShardHandler;
134 private final DOMDataTreeChangeService domDataTreeChangeService;
135 private final ActorSystem actorSystem;
137 private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
140 private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
142 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
143 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
144 private FlappingSingletonService flappingSingletonService;
145 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
146 private IdIntsListener idIntsListener;
147 private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
148 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
149 private IdIntsDOMDataTreeLIstener idIntsDdtl;
153 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
154 final DOMRpcProviderService domRpcService,
155 final ClusterSingletonServiceProvider singletonService,
156 final SchemaService schemaService,
157 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
158 final NotificationPublishService notificationPublishService,
159 final NotificationService notificationService,
160 final DOMDataBroker domDataBroker,
161 final DOMDataTreeService domDataTreeService,
162 final DistributedShardFactory distributedShardFactory,
163 final DistributedDataStoreInterface configDataStore,
164 final ActorSystemProvider actorSystemProvider) {
165 this.rpcRegistry = rpcRegistry;
166 this.domRpcService = domRpcService;
167 this.singletonService = singletonService;
168 this.schemaService = schemaService;
169 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
170 this.notificationPublishService = notificationPublishService;
171 this.notificationService = notificationService;
172 this.domDataBroker = domDataBroker;
173 this.domDataTreeService = domDataTreeService;
174 this.distributedShardFactory = distributedShardFactory;
175 this.configDataStore = configDataStore;
176 this.actorSystem = actorSystemProvider.getActorSystem();
178 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
180 domDataTreeChangeService =
181 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
183 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
185 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
186 bindingNormalizedNodeSerializer);
190 public Future<RpcResult<Void>> unregisterSingletonConstant() {
191 LOG.debug("unregister-singleton-constant");
193 if (getSingletonConstantRegistration == null) {
194 LOG.debug("No get-singleton-constant registration present.");
195 final RpcError rpcError = RpcResultBuilder
196 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
197 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
198 return Futures.immediateFuture(result);
202 getSingletonConstantRegistration.close();
203 getSingletonConstantRegistration = null;
205 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
206 } catch (final Exception e) {
207 LOG.debug("There was a problem closing the singleton constant service", e);
208 final RpcError rpcError = RpcResultBuilder
209 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
210 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
211 return Futures.immediateFuture(result);
216 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
217 LOG.debug("publish-notifications, input: {}", input);
219 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
220 input.getSeconds(), input.getNotificationsPerSecond());
222 publishNotificationsTasks.put(input.getId(), task);
226 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
230 public Future<RpcResult<Void>> subscribeDtcl() {
232 if (dtclReg != null) {
233 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
234 "There is already dataTreeChangeListener registered on id-ints list.");
235 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
238 idIntsListener = new IdIntsListener();
240 dtclReg = domDataTreeChangeService
241 .registerDataTreeChangeListener(
242 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
243 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
246 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
250 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
251 LOG.debug("write-transactions, input: {}", input);
253 final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
255 final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
256 writeTransactionsHandler.start(settableFuture);
258 return settableFuture;
262 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
267 public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
272 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
274 LOG.debug("subscribe-ynl, input: {}", input);
276 if (ynlRegistrations.containsKey(input.getId())) {
277 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
278 "There is already ynl listener registered for this id: " + input.getId());
279 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
282 ynlRegistrations.put(input.getId(),
283 notificationService.registerNotificationListener(new YnlListener(input.getId())));
285 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
289 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
290 LOG.debug("remove-prefix-shard, input: {}", input);
292 return prefixShardHandler.onRemovePrefixShard(input);
296 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
297 LOG.debug("become-prefix-leader, input: {}", input);
299 return prefixLeaderHandler.makeLeaderLocal(input);
303 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
304 LOG.debug("unregister-bound-constant, {}", input);
306 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
307 routedRegistrations.remove(input.getContext());
309 if (registration == null) {
310 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
311 final RpcError rpcError = RpcResultBuilder
312 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
313 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
314 return Futures.immediateFuture(result);
317 registration.close();
318 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
322 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
324 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
326 if (input.getConstant() == null) {
327 final RpcError error = RpcResultBuilder.newError(
328 ErrorType.RPC, "Invalid input.", "Constant value is null");
329 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
332 getSingletonConstantRegistration =
333 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
335 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
339 public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
344 public Future<RpcResult<Void>> unregisterConstant() {
346 if (globalGetConstantRegistration == null) {
347 final RpcError rpcError = RpcResultBuilder
348 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
349 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
350 return Futures.immediateFuture(result);
353 globalGetConstantRegistration.close();
354 globalGetConstantRegistration = null;
356 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
360 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
361 LOG.debug("unregister-flapping-singleton received.");
363 if (flappingSingletonService == null) {
364 final RpcError rpcError = RpcResultBuilder
365 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
366 final RpcResult<UnregisterFlappingSingletonOutput> result =
367 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
368 return Futures.immediateFuture(result);
371 final long flapCount = flappingSingletonService.setInactive();
372 flappingSingletonService = null;
374 final UnregisterFlappingSingletonOutput output =
375 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
377 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
381 public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
386 public Future<RpcResult<Void>> subscribeDdtl() {
388 if (ddtlReg != null) {
389 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
390 "There is already dataTreeChangeListener registered on id-ints list.");
391 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
394 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
398 domDataTreeService.registerListener(idIntsDdtl,
399 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
400 ProduceTransactionsHandler.ID_INT_YID))
401 , true, Collections.emptyList());
402 } catch (DOMDataTreeLoopException e) {
403 LOG.error("Failed to register DOMDataTreeListener.", e);
407 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
411 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
412 LOG.debug("register-bound-constant: {}", input);
414 if (input.getContext() == null) {
415 final RpcError error = RpcResultBuilder.newError(
416 ErrorType.RPC, "Invalid input.", "Context value is null");
417 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
420 if (input.getConstant() == null) {
421 final RpcError error = RpcResultBuilder.newError(
422 ErrorType.RPC, "Invalid input.", "Constant value is null");
423 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
426 if (routedRegistrations.containsKey(input.getContext())) {
427 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
428 "There is already a rpc registered for context: " + input.getContext());
429 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
432 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
433 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
434 input.getConstant(), input.getContext());
436 routedRegistrations.put(input.getContext(), registration);
437 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
441 public Future<RpcResult<Void>> registerFlappingSingleton() {
442 LOG.debug("Received register-flapping-singleton.");
444 if (flappingSingletonService != null) {
445 final RpcError error = RpcResultBuilder.newError(
446 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
447 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
450 flappingSingletonService = new FlappingSingletonService(singletonService);
452 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
456 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
457 LOG.debug("Received unsubscribe-dtcl");
459 if (idIntsListener == null || dtclReg == null) {
460 final RpcError error = RpcResultBuilder.newError(
461 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
462 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
468 if (!idIntsListener.hasTriggered()) {
469 final RpcError error = RpcResultBuilder.newError(
470 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
471 "any notifications.");
472 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
473 .withRpcError(error).build());
476 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
478 final Optional<NormalizedNode<?, ?>> readResult =
479 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
481 if (!readResult.isPresent()) {
482 final RpcError error = RpcResultBuilder.newError(
483 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
484 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
485 .withRpcError(error).build());
488 return Futures.immediateFuture(
489 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
490 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
492 } catch (final ReadFailedException e) {
493 final RpcError error = RpcResultBuilder.newError(
494 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
495 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
496 .withRpcError(error).build());
502 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
503 LOG.debug("create-prefix-shard, input: {}", input);
505 return prefixShardHandler.onCreatePrefixShard(input);
509 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
514 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
515 LOG.debug("Received unsubscribe-ynl, input: {}", input);
517 if (!ynlRegistrations.containsKey(input.getId())) {
518 final RpcError rpcError = RpcResultBuilder
519 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
520 final RpcResult<UnsubscribeYnlOutput> result =
521 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
522 return Futures.immediateFuture(result);
525 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
526 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
528 registration.close();
530 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
534 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
535 final CheckPublishNotificationsInput input) {
537 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
540 return Futures.immediateFuture(RpcResultBuilder.success(
541 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
544 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
545 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
547 if (task.getLastError() != null) {
548 final StringWriter sw = new StringWriter();
549 final PrintWriter pw = new PrintWriter(sw);
550 task.getLastError().printStackTrace(pw);
551 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
554 final CheckPublishNotificationsOutput output =
555 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
557 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
561 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
562 LOG.debug("producer-transactions, input: {}", input);
564 final ProduceTransactionsHandler handler =
565 new ProduceTransactionsHandler(domDataTreeService, input);
567 final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
568 handler.start(settableFuture);
570 return settableFuture;
574 public Future<RpcResult<Void>> shutdownShardReplica(final ShutdownShardReplicaInput input) {
575 LOG.debug("Received shutdown-shard-replica rpc, input: {}", input);
577 final String shardName = input.getShardName();
578 if (Strings.isNullOrEmpty(shardName)) {
579 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.APPLICATION, "bad-element",
580 "A valid shard name must be specified");
581 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(rpcError).build());
584 final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
585 final ActorContext context = configDataStore.getActorContext();
587 long timeoutInMS = Math.max(context.getDatastoreContext().getShardRaftConfig()
588 .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
589 final FiniteDuration duration = FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS);
590 final scala.concurrent.Promise<Boolean> shutdownShardAsk = akka.dispatch.Futures.promise();
592 context.findLocalShardAsync(shardName).onComplete(new OnComplete<ActorRef>() {
594 public void onComplete(final Throwable throwable, final ActorRef actorRef) throws Throwable {
595 if (throwable != null) {
596 shutdownShardAsk.failure(throwable);
598 shutdownShardAsk.completeWith(Patterns.gracefulStop(actorRef, duration, Shutdown.INSTANCE));
601 }, context.getClientDispatcher());
603 shutdownShardAsk.future().onComplete(new OnComplete<Boolean>() {
605 public void onComplete(final Throwable throwable, final Boolean gracefulStopResult) throws Throwable {
606 if (throwable != null) {
607 final RpcResult<Void> failedResult = RpcResultBuilder.<Void>failed()
608 .withError(ErrorType.APPLICATION, "Failed to gracefully shutdown shard", throwable).build();
609 rpcResult.set(failedResult);
611 // according to Patterns.gracefulStop API, we don't have to
612 // check value of gracefulStopResult
613 rpcResult.set(RpcResultBuilder.<Void>success().build());
616 }, context.getClientDispatcher());
622 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
624 LOG.debug("Received register-constant rpc, input: {}", input);
626 if (input.getConstant() == null) {
627 final RpcError error = RpcResultBuilder.newError(
628 ErrorType.RPC, "Invalid input.", "Constant value is null");
629 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
632 if (globalGetConstantRegistration != null) {
633 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
634 "There is already a get-constant rpc registered.");
635 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
638 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
639 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
643 public Future<RpcResult<Void>> unregisterDefaultConstant() {
648 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
649 LOG.debug("Received unsubscribe-ddtl.");
651 if (idIntsDdtl == null || ddtlReg == null) {
652 final RpcError error = RpcResultBuilder.newError(
653 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
654 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
660 if (!idIntsDdtl.hasTriggered()) {
661 final RpcError error = RpcResultBuilder.newError(
662 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
663 "any notifications.");
664 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
665 .withRpcError(error).build());
668 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
669 LOG.debug("Creating distributed datastore client for shard {}", shardName);
671 final ActorContext actorContext = configDataStore.getActorContext();
672 final Props distributedDataStoreClientProps =
673 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
674 "Shard-" + shardName, actorContext, shardName);
676 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
677 final DataStoreClient distributedDataStoreClient;
679 distributedDataStoreClient = SimpleDataStoreClientActor
680 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
681 } catch (final Exception e) {
682 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
683 clientActor.tell(PoisonPill.getInstance(), noSender());
684 final RpcError error = RpcResultBuilder.newError(
685 ErrorType.APPLICATION, "Unable to create ds client for read.",
686 "Unable to create ds client for read.");
687 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
688 .withRpcError(error).build());
691 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
692 final ClientTransaction tx = localHistory.createTransaction();
693 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
694 org.opendaylight.mdsal.common.api.ReadFailedException> read =
695 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
698 localHistory.close();
700 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
701 if (!optional.isPresent()) {
702 LOG.warn("Final read from client is empty.");
703 final RpcError error = RpcResultBuilder.newError(
704 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
705 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
706 .withRpcError(error).build());
709 return Futures.immediateFuture(
710 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
711 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
713 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
714 LOG.error("Unable to read data to verify ddtl data.", e);
715 final RpcError error = RpcResultBuilder.newError(
716 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
717 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
718 .withRpcError(error).build());
720 distributedDataStoreClient.close();
721 clientActor.tell(PoisonPill.getInstance(), noSender());