2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import com.google.common.base.Optional;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23 import java.util.Collections;
24 import java.util.HashMap;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.ActorSystemProvider;
29 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
33 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
36 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
37 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
38 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
39 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
40 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
41 import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
42 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
43 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
44 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
45 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
46 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
47 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
48 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
49 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
50 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
51 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
52 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
55 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
56 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
58 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
59 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
60 import org.opendaylight.controller.sal.core.api.model.SchemaService;
61 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
62 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
66 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
67 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
87 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
88 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
89 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
90 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
91 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
92 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
93 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
94 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
95 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
96 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
97 import org.opendaylight.yangtools.concepts.ListenerRegistration;
98 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
99 import org.opendaylight.yangtools.yang.common.RpcError;
100 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
101 import org.opendaylight.yangtools.yang.common.RpcResult;
102 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
103 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.slf4j.Logger;
106 import org.slf4j.LoggerFactory;
108 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
110 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
111 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
112 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
114 private final RpcProviderRegistry rpcRegistry;
115 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
116 private final DistributedShardFactory distributedShardFactory;
117 private final DistributedDataStoreInterface configDataStore;
118 private final DOMDataTreeService domDataTreeService;
119 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
120 private final DOMDataBroker domDataBroker;
121 private final NotificationPublishService notificationPublishService;
122 private final NotificationService notificationService;
123 private final SchemaService schemaService;
124 private final ClusterSingletonServiceProvider singletonService;
125 private final DOMRpcProviderService domRpcService;
126 private final PrefixLeaderHandler prefixLeaderHandler;
127 private final PrefixShardHandler prefixShardHandler;
128 private final DOMDataTreeChangeService domDataTreeChangeService;
129 private final ActorSystem actorSystem;
131 private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
134 private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
136 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
137 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
138 private FlappingSingletonService flappingSingletonService;
139 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
140 private IdIntsListener idIntsListener;
141 private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
142 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
143 private IdIntsDOMDataTreeLIstener idIntsDdtl;
147 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
148 final DOMRpcProviderService domRpcService,
149 final ClusterSingletonServiceProvider singletonService,
150 final SchemaService schemaService,
151 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
152 final NotificationPublishService notificationPublishService,
153 final NotificationService notificationService,
154 final DOMDataBroker domDataBroker,
155 final DOMDataTreeService domDataTreeService,
156 final DistributedShardFactory distributedShardFactory,
157 final DistributedDataStoreInterface configDataStore,
158 final ActorSystemProvider actorSystemProvider) {
159 this.rpcRegistry = rpcRegistry;
160 this.domRpcService = domRpcService;
161 this.singletonService = singletonService;
162 this.schemaService = schemaService;
163 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
164 this.notificationPublishService = notificationPublishService;
165 this.notificationService = notificationService;
166 this.domDataBroker = domDataBroker;
167 this.domDataTreeService = domDataTreeService;
168 this.distributedShardFactory = distributedShardFactory;
169 this.configDataStore = configDataStore;
170 this.actorSystem = actorSystemProvider.getActorSystem();
172 this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
174 domDataTreeChangeService =
175 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
177 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
179 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
180 bindingNormalizedNodeSerializer);
184 public Future<RpcResult<Void>> unregisterSingletonConstant() {
185 LOG.debug("unregister-singleton-constant");
187 if (getSingletonConstantRegistration == null) {
188 LOG.debug("No get-singleton-constant registration present.");
189 final RpcError rpcError = RpcResultBuilder
190 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
191 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
192 return Futures.immediateFuture(result);
196 getSingletonConstantRegistration.close();
197 getSingletonConstantRegistration = null;
199 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
200 } catch (final Exception e) {
201 LOG.debug("There was a problem closing the singleton constant service", e);
202 final RpcError rpcError = RpcResultBuilder
203 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
204 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
205 return Futures.immediateFuture(result);
210 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
211 LOG.debug("publish-notifications, input: {}", input);
213 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
214 input.getSeconds(), input.getNotificationsPerSecond());
216 publishNotificationsTasks.put(input.getId(), task);
220 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
224 public Future<RpcResult<Void>> subscribeDtcl() {
226 if (dtclReg != null) {
227 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
228 "There is already dataTreeChangeListener registered on id-ints list.");
229 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
232 idIntsListener = new IdIntsListener();
234 dtclReg = domDataTreeChangeService
235 .registerDataTreeChangeListener(
236 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
237 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID),
240 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
244 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
245 LOG.debug("write-transactions, input: {}", input);
247 final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
249 final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
250 writeTransactionsHandler.start(settableFuture);
252 return settableFuture;
256 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
261 public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
266 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
268 LOG.debug("subscribe-ynl, input: {}", input);
270 if (ynlRegistrations.containsKey(input.getId())) {
271 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
272 "There is already ynl listener registered for this id: " + input.getId());
273 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
276 ynlRegistrations.put(input.getId(),
277 notificationService.registerNotificationListener(new YnlListener(input.getId())));
279 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
283 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
284 LOG.debug("remove-prefix-shard, input: {}", input);
286 return prefixShardHandler.onRemovePrefixShard(input);
290 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
291 LOG.debug("become-prefix-leader, input: {}", input);
293 return prefixLeaderHandler.makeLeaderLocal(input);
297 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
298 LOG.debug("unregister-bound-constant, {}", input);
300 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
301 routedRegistrations.remove(input.getContext());
303 if (registration == null) {
304 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
305 final RpcError rpcError = RpcResultBuilder
306 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
307 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
308 return Futures.immediateFuture(result);
311 registration.close();
312 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
316 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
318 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
320 if (input.getConstant() == null) {
321 final RpcError error = RpcResultBuilder.newError(
322 ErrorType.RPC, "Invalid input.", "Constant value is null");
323 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
326 getSingletonConstantRegistration =
327 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
329 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
333 public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
338 public Future<RpcResult<Void>> unregisterConstant() {
340 if (globalGetConstantRegistration == null) {
341 final RpcError rpcError = RpcResultBuilder
342 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
343 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
344 return Futures.immediateFuture(result);
347 globalGetConstantRegistration.close();
348 globalGetConstantRegistration = null;
350 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
354 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
355 LOG.debug("unregister-flapping-singleton received.");
357 if (flappingSingletonService == null) {
358 final RpcError rpcError = RpcResultBuilder
359 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
360 final RpcResult<UnregisterFlappingSingletonOutput> result =
361 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
362 return Futures.immediateFuture(result);
365 final long flapCount = flappingSingletonService.setInactive();
366 flappingSingletonService = null;
368 final UnregisterFlappingSingletonOutput output =
369 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
371 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
375 public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
380 public Future<RpcResult<Void>> subscribeDdtl() {
382 if (ddtlReg != null) {
383 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
384 "There is already dataTreeChangeListener registered on id-ints list.");
385 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
388 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
392 domDataTreeService.registerListener(idIntsDdtl,
393 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
394 ProduceTransactionsHandler.ID_INT_YID))
395 , true, Collections.emptyList());
396 } catch (DOMDataTreeLoopException e) {
397 LOG.error("Failed to register DOMDataTreeListener.", e);
401 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
405 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
406 LOG.debug("register-bound-constant: {}", input);
408 if (input.getContext() == null) {
409 final RpcError error = RpcResultBuilder.newError(
410 ErrorType.RPC, "Invalid input.", "Context value is null");
411 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
414 if (input.getConstant() == null) {
415 final RpcError error = RpcResultBuilder.newError(
416 ErrorType.RPC, "Invalid input.", "Constant value is null");
417 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
420 if (routedRegistrations.containsKey(input.getContext())) {
421 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
422 "There is already a rpc registered for context: " + input.getContext());
423 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
426 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
427 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
428 input.getConstant(), input.getContext());
430 routedRegistrations.put(input.getContext(), registration);
431 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
435 public Future<RpcResult<Void>> registerFlappingSingleton() {
436 LOG.debug("Received register-flapping-singleton.");
438 if (flappingSingletonService != null) {
439 final RpcError error = RpcResultBuilder.newError(
440 ErrorType.RPC, "Registration present.", "flapping-singleton already registered");
441 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
444 flappingSingletonService = new FlappingSingletonService(singletonService);
446 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
450 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
451 LOG.debug("Received unsubscribe-dtcl");
453 if (idIntsListener == null || dtclReg == null) {
454 final RpcError error = RpcResultBuilder.newError(
455 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
456 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
462 if (!idIntsListener.hasTriggered()) {
463 final RpcError error = RpcResultBuilder.newError(
464 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
465 "any notifications.");
466 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
467 .withRpcError(error).build());
470 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
472 final Optional<NormalizedNode<?, ?>> readResult =
473 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INT_YID).checkedGet();
475 if (!readResult.isPresent()) {
476 final RpcError error = RpcResultBuilder.newError(
477 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
478 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
479 .withRpcError(error).build());
482 return Futures.immediateFuture(
483 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
484 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
486 } catch (final ReadFailedException e) {
487 final RpcError error = RpcResultBuilder.newError(
488 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
489 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
490 .withRpcError(error).build());
496 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
497 LOG.debug("create-prefix-shard, input: {}", input);
499 return prefixShardHandler.onCreatePrefixShard(input);
503 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
508 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
509 LOG.debug("Received unsubscribe-ynl, input: {}", input);
511 if (!ynlRegistrations.containsKey(input.getId())) {
512 final RpcError rpcError = RpcResultBuilder
513 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
514 final RpcResult<UnsubscribeYnlOutput> result =
515 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
516 return Futures.immediateFuture(result);
519 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
520 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
522 registration.close();
524 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
528 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
529 final CheckPublishNotificationsInput input) {
531 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
534 return Futures.immediateFuture(RpcResultBuilder.success(
535 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
538 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
539 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
541 if (task.getLastError() != null) {
542 final StringWriter sw = new StringWriter();
543 final PrintWriter pw = new PrintWriter(sw);
544 task.getLastError().printStackTrace(pw);
545 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
548 final CheckPublishNotificationsOutput output =
549 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
551 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
555 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
556 LOG.debug("producer-transactions, input: {}", input);
558 final ProduceTransactionsHandler handler =
559 new ProduceTransactionsHandler(domDataTreeService, input);
561 final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
562 handler.start(settableFuture);
564 return settableFuture;
568 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
570 LOG.debug("Received register-constant rpc, input: {}", input);
572 if (input.getConstant() == null) {
573 final RpcError error = RpcResultBuilder.newError(
574 ErrorType.RPC, "Invalid input.", "Constant value is null");
575 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
578 if (globalGetConstantRegistration != null) {
579 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
580 "There is already a get-constant rpc registered.");
581 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
584 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
585 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
589 public Future<RpcResult<Void>> unregisterDefaultConstant() {
594 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
595 LOG.debug("Received unsubscribe-ddtl.");
597 if (idIntsDdtl == null || ddtlReg == null) {
598 final RpcError error = RpcResultBuilder.newError(
599 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
600 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
606 if (!idIntsDdtl.hasTriggered()) {
607 final RpcError error = RpcResultBuilder.newError(
608 ErrorType.APPLICATION, "No notification received.", "id-ints listener has not received" +
609 "any notifications.");
610 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
611 .withRpcError(error).build());
614 final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
615 LOG.debug("Creating distributed datastore client for shard {}", shardName);
617 final ActorContext actorContext = configDataStore.getActorContext();
618 final Props distributedDataStoreClientProps =
619 SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
620 "Shard-" + shardName, actorContext, shardName);
622 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
623 final DataStoreClient distributedDataStoreClient;
625 distributedDataStoreClient = SimpleDataStoreClientActor
626 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
627 } catch (final Exception e) {
628 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
629 clientActor.tell(PoisonPill.getInstance(), noSender());
630 final RpcError error = RpcResultBuilder.newError(
631 ErrorType.APPLICATION, "Unable to create ds client for read.",
632 "Unable to create ds client for read.");
633 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
634 .withRpcError(error).build());
637 final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
638 final ClientTransaction tx = localHistory.createTransaction();
639 final CheckedFuture<Optional<NormalizedNode<?, ?>>,
640 org.opendaylight.mdsal.common.api.ReadFailedException> read =
641 tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
644 localHistory.close();
646 final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
647 if (!optional.isPresent()) {
648 LOG.warn("Final read from client is empty.");
649 final RpcError error = RpcResultBuilder.newError(
650 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints is empty.");
651 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
652 .withRpcError(error).build());
655 return Futures.immediateFuture(
656 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
657 .setCopyMatches(idIntsDdtl.checkEqual(optional.get()))).build());
659 } catch (org.opendaylight.mdsal.common.api.ReadFailedException e) {
660 LOG.error("Unable to read data to verify ddtl data.", e);
661 final RpcError error = RpcResultBuilder.newError(
662 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
663 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
664 .withRpcError(error).build());
666 distributedDataStoreClient.close();
667 clientActor.tell(PoisonPill.getInstance(), noSender());