2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import com.google.common.base.Optional;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23 import java.util.Collections;
24 import java.util.HashMap;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.ActorSystemProvider;
29 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
33 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
36 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
37 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
38 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
39 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
40 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
41 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
42 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
43 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
45 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
46 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
47 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
49 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
50 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
51 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
52 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
56 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
58 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
59 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
60 import org.opendaylight.controller.sal.core.api.model.SchemaService;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
97 import org.opendaylight.yangtools.concepts.ListenerRegistration;
98 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
99 import org.opendaylight.yangtools.yang.common.RpcError;
100 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
101 import org.opendaylight.yangtools.yang.common.RpcResult;
102 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
103 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.slf4j.Logger;
106 import org.slf4j.LoggerFactory;
108 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
110 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
111 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
112 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
114 private final RpcProviderRegistry rpcRegistry;
115 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
116 private final DistributedShardFactory distributedShardFactory;
117 private final DistributedDataStoreInterface configDataStore;
118 private final DOMDataTreeService domDataTreeService;
119 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
120 private final DOMDataBroker domDataBroker;
121 private final NotificationPublishService notificationPublishService;
122 private final NotificationService notificationService;
123 private final SchemaService schemaService;
124 private final ClusterSingletonServiceProvider singletonService;
125 private final DOMRpcProviderService domRpcService;
126 private final PrefixLeaderHandler prefixLeaderHandler;
127 private final PrefixShardHandler prefixShardHandler;
128 private final DOMDataTreeChangeService domDataTreeChangeService;
129 private final ActorSystem actorSystem;
131 private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
134 private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
136 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
137 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
138 private FlappingSingletonService flappingSingletonService;
139 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
140 private IdIntsListener idIntsListener;
141 private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
142 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
143 private IdIntsDOMDataTreeLIstener idIntsDdtl;
147 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
148 final DOMRpcProviderService domRpcService,
149 final ClusterSingletonServiceProvider singletonService,
150 final SchemaService schemaService,
151 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
152 final NotificationPublishService notificationPublishService,
153 final NotificationService notificationService,
154 final DOMDataBroker domDataBroker,
155 final DOMDataTreeService domDataTreeService,
156 final DistributedShardFactory distributedShardFactory,
157 final DistributedDataStoreInterface configDataStore,
158 final ActorSystemProvider actorSystemProvider) {
159 this.rpcRegistry = rpcRegistry;
160 this.domRpcService = domRpcService;
161 this.singletonService = singletonService;
162 this.schemaService = schemaService;
163 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
164 this.notificationPublishService = notificationPublishService;
165 this.notificationService = notificationService;
166 this.domDataBroker = domDataBroker;
167 this.domDataTreeService = domDataTreeService;
168 this.distributedShardFactory = distributedShardFactory;
169 this.configDataStore = configDataStore;
170 this.actorSystem = actorSystemProvider.getActorSystem();
172 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
174 domDataTreeChangeService =
175 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
177 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
179 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
180 bindingNormalizedNodeSerializer);
184 public Future<RpcResult<Void>> unregisterSingletonConstant() {
185 LOG.debug("unregister-singleton-constant");
187 if (getSingletonConstantRegistration == null) {
188 LOG.debug("No get-singleton-constant registration present.");
189 final RpcError rpcError = RpcResultBuilder
190 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
191 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
192 return Futures.immediateFuture(result);
196 getSingletonConstantRegistration.close();
197 getSingletonConstantRegistration = null;
199 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
200 } catch (final Exception e) {
201 LOG.debug("There was a problem closing the singleton constant service", e);
202 final RpcError rpcError = RpcResultBuilder
203 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
204 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
205 return Futures.immediateFuture(result);
210 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
211 LOG.debug("publish-notifications, input: {}", input);
213 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
214 input.getSeconds(), input.getNotificationsPerSecond());
216 publishNotificationsTasks.put(input.getId(), task);
220 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
224 public Future<RpcResult<Void>> subscribeDtcl() {
226 if (dtclReg != null) {
227 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
228 "There is already dataTreeChangeListener registered on id-ints list.");
229 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
232 idIntsListener = new IdIntsListener();
234 dtclReg = domDataTreeChangeService
235 .registerDataTreeChangeListener(
236 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
237 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
240 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
244 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
245 LOG.debug("write-transactions, input: {}", input);
247 final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
249 final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
250 writeTransactionsHandler.start(settableFuture);
252 return settableFuture;
256 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
261 public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
266 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
268 LOG.debug("subscribe-ynl, input: {}", input);
270 if (ynlRegistrations.containsKey(input.getId())) {
271 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
272 "There is already ynl listener registered for this id: " + input.getId());
273 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
276 ynlRegistrations.put(input.getId(),
277 notificationService.registerNotificationListener(new YnlListener(input.getId())));
279 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
283 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
284 LOG.debug("remove-prefix-shard, input: {}", input);
286 return prefixShardHandler.onRemovePrefixShard(input);
290 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
291 LOG.debug("become-prefix-leader, input: {}", input);
293 return prefixLeaderHandler.makeLeaderLocal(input);
297 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
298 LOG.debug("unregister-bound-constant, {}", input);
300 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
301 routedRegistrations.remove(input.getContext());
303 if (registration == null) {
304 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
305 final RpcError rpcError = RpcResultBuilder
306 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
307 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
308 return Futures.immediateFuture(result);
311 registration.close();
312 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
316 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
318 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
320 if (input.getConstant() == null) {
321 final RpcError error = RpcResultBuilder.newError(
322 ErrorType.RPC, "Invalid input.", "Constant value is null");
323 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
326 getSingletonConstantRegistration =
327 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
329 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
333 public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
338 public Future<RpcResult<Void>> unregisterConstant() {
340 if (globalGetConstantRegistration == null) {
341 final RpcError rpcError = RpcResultBuilder
342 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
343 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
344 return Futures.immediateFuture(result);
347 globalGetConstantRegistration.close();
348 globalGetConstantRegistration = null;
350 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
354 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
355 LOG.debug("unregister-flapping-singleton received.");
357 if (flappingSingletonService == null) {
358 final RpcError rpcError = RpcResultBuilder
359 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
360 final RpcResult<UnregisterFlappingSingletonOutput> result =
361 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
362 return Futures.immediateFuture(result);
365 final long flapCount = flappingSingletonService.setInactive();
366 flappingSingletonService = null;
368 final UnregisterFlappingSingletonOutput output =
369 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
371 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
375 public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
380 public Future<RpcResult<Void>> subscribeDdtl() {
382 if (ddtlReg != null) {
383 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
384 "There is already dataTreeChangeListener registered on id-ints list.");
385 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
388 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
392 domDataTreeService.registerListener(idIntsDdtl,
393 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
394 ProduceTransactionsHandler.ID_INT_YID))
395 , true, Collections.emptyList());
396 } catch (DOMDataTreeLoopException e) {
397 LOG.error("Failed to register DOMDataTreeListener.", e);
401 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
405 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
406 LOG.debug("register-bound-constant: {}", input);
408 if (input.getContext() == null) {
409 final RpcError error = RpcResultBuilder.newError(
410 ErrorType.RPC, "Invalid input.", "Context value is null");
411 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
414 if (input.getConstant() == null) {
415 final RpcError error = RpcResultBuilder.newError(
416 ErrorType.RPC, "Invalid input.", "Constant value is null");
417 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
420 if (routedRegistrations.containsKey(input.getContext())) {
421 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
422 "There is already a rpc registered for context: " + input.getContext());
423 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
426 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
427 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
428 input.getConstant(), input.getContext());
430 routedRegistrations.put(input.getContext(), registration);
431 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
435 public Future<RpcResult<Void>> registerFlappingSingleton() {
436 LOG.debug("Received register-flapping-singleton.");
438 if (flappingSingletonService != null) {
439 final RpcError error = RpcResultBuilder.newError(
440 ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
441 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
444 flappingSingletonService = new FlappingSingletonService(singletonService);
446 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
450 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
451 LOG.debug("Received unsubscribe-dtcl");
453 if (idIntsListener == null || dtclReg == null) {
454 final RpcError error = RpcResultBuilder.newError(
455 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
456 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
459 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
461 if (dtclReg != null) {
468 final Optional<NormalizedNode<?, ?>> readResult =
469 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
471 if (!readResult.isPresent()) {
472 final RpcError error = RpcResultBuilder.newError(
473 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
474 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
475 .withRpcError(error).build());
478 return Futures.immediateFuture(
479 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
480 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
482 } catch (final ReadFailedException e) {
483 final RpcError error = RpcResultBuilder.newError(
484 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
485 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
486 .withRpcError(error).build());
492 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
493 LOG.debug("create-prefix-shard, input: {}", input);
495 return prefixShardHandler.onCreatePrefixShard(input);
499 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
504 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
505 LOG.debug("Received unsubscribe-ynl, input: {}", input);
507 if (!ynlRegistrations.containsKey(input.getId())) {
508 final RpcError rpcError = RpcResultBuilder
509 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
510 final RpcResult<UnsubscribeYnlOutput> result =
511 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
512 return Futures.immediateFuture(result);
515 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
516 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
518 registration.close();
520 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
524 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
525 final CheckPublishNotificationsInput input) {
527 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
530 return Futures.immediateFuture(RpcResultBuilder.success(
531 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
534 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
535 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
537 if (task.getLastError() != null) {
538 final StringWriter sw = new StringWriter();
539 final PrintWriter pw = new PrintWriter(sw);
540 task.getLastError().printStackTrace(pw);
541 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
544 final CheckPublishNotificationsOutput output =
545 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
547 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
551 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
552 LOG.debug("producer-transactions, input: {}", input);
554 final ProduceTransactionsHandler handler =
555 new ProduceTransactionsHandler(domDataTreeService, input);
557 final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
558 handler.start(settableFuture);
560 return settableFuture;
564 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
566 LOG.debug("Received register-constant rpc, input: {}", input);
568 if (input.getConstant() == null) {
569 final RpcError error = RpcResultBuilder.newError(
570 ErrorType.RPC, "Invalid input.", "Constant value is null");
571 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
574 if (globalGetConstantRegistration != null) {
575 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
576 "There is already a get-constant rpc registered.");
577 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
580 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
581 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
585 public Future<RpcResult<Void>> unregisterDefaultConstant() {
590 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
591 LOG.debug("Received unsubscribe-ddtl.");
593 if (idIntsDdtl == null || ddtlReg == null) {
594 final RpcError error = RpcResultBuilder.newError(
595 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
596 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
602 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
603 LOG.debug("Creating distributed datastore client for shard {}", shardName);
605 final ActorContext actorContext = configDataStore.getActorContext();
606 final Props distributedDataStoreClientProps =
607 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
608 "Shard-" + shardName, actorContext, shardName);
610 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
611 final DataStoreClient distributedDataStoreClient;
613 distributedDataStoreClient = SimpleDataStoreClientActor
614 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
615 } catch (final Exception e) {
616 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
617 clientActor.tell(PoisonPill.getInstance(), noSender());
618 final RpcError error = RpcResultBuilder.newError(
619 ErrorType.APPLICATION, "Unable to create ds client for read.",
620 "Unable to create ds client for read.");
621 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
622 .withRpcError(error).build());
625 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
626 final ClientTransaction tx = localHistory.createTransaction();
627 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
628 org.opendaylight.mdsal.common.api.ReadFailedException> read =
629 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
632 localHistory.close();
634 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
635 if (!optional.isPresent()) {
636 LOG.warn("Final read from client is empty.");
637 final RpcError error = RpcResultBuilder.newError(
638 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
639 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
640 .withRpcError(error).build());
643 return Futures.immediateFuture(
644 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
645 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
647 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
648 LOG.error("Unable to read data to verify ddtl data.", e);
649 final RpcError error = RpcResultBuilder.newError(
650 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
651 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
652 .withRpcError(error).build());
654 distributedDataStoreClient.close();
655 clientActor.tell(PoisonPill.getInstance(), noSender());