2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.io.PrintWriter;
17 import java.io.StringWriter;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
26 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
27 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
28 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
29 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
30 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
31 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
32 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
33 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
34 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
35 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
36 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
39 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
40 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
41 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
42 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
44 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
45 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
46 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
47 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
48 import org.opendaylight.controller.sal.core.api.model.SchemaService;
49 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
50 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
56 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
57 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
80 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
81 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
82 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
83 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
84 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
85 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
86 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
87 import org.opendaylight.yangtools.concepts.ListenerRegistration;
88 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
89 import org.opendaylight.yangtools.yang.common.RpcError;
90 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
91 import org.opendaylight.yangtools.yang.common.RpcResult;
92 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
93 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
94 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
98 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
100 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
101 private static final org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType CONTROLLER_CONFIG =
102 org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
104 private final RpcProviderRegistry rpcRegistry;
105 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
106 private final DistributedShardFactory distributedShardFactory;
107 private final DOMDataTreeService domDataTreeService;
108 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
109 private final DOMDataBroker domDataBroker;
110 private final NotificationPublishService notificationPublishService;
111 private final NotificationService notificationService;
112 private final SchemaService schemaService;
113 private final ClusterSingletonServiceProvider singletonService;
114 private final DOMRpcProviderService domRpcService;
115 private final PrefixShardHandler prefixShardHandler;
116 private final DOMDataTreeChangeService domDataTreeChangeService;
118 private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
121 private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
123 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
124 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
125 private FlappingSingletonService flappingSingletonService;
126 private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
127 private IdIntsListener idIntsListener;
128 private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
129 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
130 private IdIntsDOMDataTreeLIstener idIntsDdtl;
134 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
135 final DOMRpcProviderService domRpcService,
136 final ClusterSingletonServiceProvider singletonService,
137 final SchemaService schemaService,
138 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
139 final NotificationPublishService notificationPublishService,
140 final NotificationService notificationService,
141 final DOMDataBroker domDataBroker,
142 final DOMDataTreeService domDataTreeService,
143 final DistributedShardFactory distributedShardFactory) {
144 this.rpcRegistry = rpcRegistry;
145 this.domRpcService = domRpcService;
146 this.singletonService = singletonService;
147 this.schemaService = schemaService;
148 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
149 this.notificationPublishService = notificationPublishService;
150 this.notificationService = notificationService;
151 this.domDataBroker = domDataBroker;
152 this.domDataTreeService = domDataTreeService;
153 this.distributedShardFactory = distributedShardFactory;
155 domDataTreeChangeService =
156 (DOMDataTreeChangeService) domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class);
158 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
160 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
164 public Future<RpcResult<Void>> unregisterSingletonConstant() {
165 LOG.debug("unregister-singleton-constant");
167 if (getSingletonConstantRegistration == null) {
168 LOG.debug("No get-singleton-constant registration present.");
169 final RpcError rpcError = RpcResultBuilder
170 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
171 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
172 return Futures.immediateFuture(result);
176 getSingletonConstantRegistration.close();
177 getSingletonConstantRegistration = null;
179 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
180 } catch (final Exception e) {
181 LOG.debug("There was a problem closing the singleton constant service", e);
182 final RpcError rpcError = RpcResultBuilder
183 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
184 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
185 return Futures.immediateFuture(result);
190 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
191 LOG.debug("publish-notifications, input: {}", input);
193 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
194 input.getSeconds(), input.getNotificationsPerSecond());
196 publishNotificationsTasks.put(input.getId(), task);
200 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
204 public Future<RpcResult<Void>> subscribeDtcl() {
206 if (dtclReg != null) {
207 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
208 "There is already dataTreeChangeListener registered on id-ints list.");
209 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
212 idIntsListener = new IdIntsListener();
214 dtclReg = domDataTreeChangeService
215 .registerDataTreeChangeListener(
216 new org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier(
217 CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INTS_YID),
220 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
224 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
225 LOG.debug("write-transactions, input: {}", input);
227 final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
229 final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
230 writeTransactionsHandler.start(settableFuture);
232 return settableFuture;
236 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
241 public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
246 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
248 LOG.debug("subscribe-ynl, input: {}", input);
250 if (ynlRegistrations.containsKey(input.getId())) {
251 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
252 "There is already ynl listener registered for this id: " + input.getId());
253 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
256 ynlRegistrations.put(input.getId(),
257 notificationService.registerNotificationListener(new YnlListener(input.getId())));
259 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
263 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
264 LOG.debug("remove-prefix-shard, input: {}", input);
266 return prefixShardHandler.onRemovePrefixShard(input);
270 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
275 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
276 LOG.debug("unregister-bound-constant, {}", input);
278 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
279 routedRegistrations.remove(input.getContext());
281 if (registration == null) {
282 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
283 final RpcError rpcError = RpcResultBuilder
284 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
285 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
286 return Futures.immediateFuture(result);
289 registration.close();
290 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
294 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
296 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
298 if (input.getConstant() == null) {
299 final RpcError error = RpcResultBuilder.newError(
300 ErrorType.RPC, "Invalid input.", "Constant value is null");
301 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
304 getSingletonConstantRegistration =
305 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
307 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
311 public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
316 public Future<RpcResult<Void>> unregisterConstant() {
318 if (globalGetConstantRegistration == null) {
319 final RpcError rpcError = RpcResultBuilder
320 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
321 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
322 return Futures.immediateFuture(result);
325 globalGetConstantRegistration.close();
326 globalGetConstantRegistration = null;
328 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
332 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
333 LOG.debug("unregister-flapping-singleton received.");
335 if (flappingSingletonService == null) {
336 final RpcError rpcError = RpcResultBuilder
337 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
338 final RpcResult<UnregisterFlappingSingletonOutput> result =
339 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
340 return Futures.immediateFuture(result);
343 final long flapCount = flappingSingletonService.setInactive();
344 flappingSingletonService = null;
346 final UnregisterFlappingSingletonOutput output =
347 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
349 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
353 public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
358 public Future<RpcResult<Void>> subscribeDdtl() {
360 if (ddtlReg != null) {
361 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
362 "There is already dataTreeChangeListener registered on id-ints list.");
363 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
366 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
370 domDataTreeService.registerListener(idIntsDdtl,
371 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
372 ProduceTransactionsHandler.ID_INTS_YID))
373 , true, Collections.emptyList());
374 } catch (DOMDataTreeLoopException e) {
375 LOG.error("Failed to register DOMDataTreeListener.", e);
379 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
383 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
384 LOG.debug("register-bound-constant: {}", input);
386 if (input.getContext() == null) {
387 final RpcError error = RpcResultBuilder.newError(
388 ErrorType.RPC, "Invalid input.", "Context value is null");
389 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
392 if (input.getConstant() == null) {
393 final RpcError error = RpcResultBuilder.newError(
394 ErrorType.RPC, "Invalid input.", "Constant value is null");
395 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
398 if (routedRegistrations.containsKey(input.getContext())) {
399 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
400 "There is already a rpc registered for context: " + input.getContext());
401 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
404 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
405 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
406 input.getConstant(), input.getContext());
408 routedRegistrations.put(input.getContext(), registration);
409 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
413 public Future<RpcResult<Void>> registerFlappingSingleton() {
414 LOG.debug("Received register-flapping-singleton.");
416 if (flappingSingletonService != null) {
417 final RpcError error = RpcResultBuilder.newError(
418 ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
419 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
422 flappingSingletonService = new FlappingSingletonService(singletonService);
424 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
428 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
429 LOG.debug("Received unsubscribe-dtcl");
431 if (idIntsListener == null || dtclReg == null) {
432 final RpcError error = RpcResultBuilder.newError(
433 ErrorType.RPC, "Dtcl missing.", "No DataTreeChangeListener registered.");
434 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed().withRpcError(error).build());
437 final DOMDataReadOnlyTransaction rTx = domDataBroker.newReadOnlyTransaction();
439 if (dtclReg != null) {
444 final Optional<NormalizedNode<?, ?>> readResult =
445 rTx.read(CONTROLLER_CONFIG, WriteTransactionsHandler.ID_INTS_YID).checkedGet();
447 if (!readResult.isPresent()) {
448 final RpcError error = RpcResultBuilder.newError(
449 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
450 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
451 .withRpcError(error).build());
454 return Futures.immediateFuture(
455 RpcResultBuilder.success(new UnsubscribeDtclOutputBuilder()
456 .setCopyMatches(idIntsListener.checkEqual(readResult.get()))).build());
458 } catch (final ReadFailedException e) {
459 final RpcError error = RpcResultBuilder.newError(
460 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
461 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDtclOutput>failed()
462 .withRpcError(error).build());
468 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
469 LOG.debug("create-prefix-shard, input: {}", input);
471 return prefixShardHandler.onCreatePrefixShard(input);
475 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
480 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
481 LOG.debug("Received unsubscribe-ynl, input: {}", input);
483 if (!ynlRegistrations.containsKey(input.getId())) {
484 final RpcError rpcError = RpcResultBuilder
485 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
486 final RpcResult<UnsubscribeYnlOutput> result =
487 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
488 return Futures.immediateFuture(result);
491 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
492 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
494 registration.close();
496 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
500 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
501 final CheckPublishNotificationsInput input) {
503 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
506 return Futures.immediateFuture(RpcResultBuilder.success(
507 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
510 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
511 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
513 if (task.getLastError() != null) {
514 final StringWriter sw = new StringWriter();
515 final PrintWriter pw = new PrintWriter(sw);
516 task.getLastError().printStackTrace(pw);
517 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
520 final CheckPublishNotificationsOutput output =
521 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
523 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
527 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
528 LOG.debug("producer-transactions, input: {}", input);
530 final ProduceTransactionsHandler handler =
531 new ProduceTransactionsHandler(domDataTreeService, input);
533 final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
534 handler.start(settableFuture);
536 return settableFuture;
540 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
542 LOG.debug("Received register-constant rpc, input: {}", input);
544 if (input.getConstant() == null) {
545 final RpcError error = RpcResultBuilder.newError(
546 ErrorType.RPC, "Invalid input.", "Constant value is null");
547 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
550 if (globalGetConstantRegistration != null) {
551 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
552 "There is already a get-constant rpc registered.");
553 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
556 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
557 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
561 public Future<RpcResult<Void>> unregisterDefaultConstant() {
566 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
567 LOG.debug("Received unsubscribe-ddtl.");
569 if (idIntsDdtl == null || ddtlReg == null) {
570 final RpcError error = RpcResultBuilder.newError(
571 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
572 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
578 final ReadListener readListener = new ReadListener();
580 final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
581 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
582 ProduceTransactionsHandler.ID_INTS_YID))
583 , true, Collections.emptyList());
585 final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
586 registration.close();
588 if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
589 final RpcError error = RpcResultBuilder.newError(
590 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
591 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
592 .withRpcError(error).build());
595 final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
597 return Futures.immediateFuture(
598 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
599 .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
602 } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
603 LOG.error("Unable to read data to verify ddtl data.", e);
604 final RpcError error = RpcResultBuilder.newError(
605 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
606 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
607 .withRpcError(error).build());
611 private static class ReadListener implements DOMDataTreeListener {
613 private Collection<DataTreeCandidate> changes = null;
614 private SettableFuture<DataTreeCandidate> readFuture;
617 public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
618 @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
619 Preconditions.checkArgument(changes.size() == 1);
621 if (this.changes == null) {
622 this.changes = changes;
624 readFuture.set(changes.iterator().next());
629 public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
630 LOG.error("Read Listener failed. {}", causes);
633 public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
634 if (changes != null) {
635 return Futures.immediateFuture(changes.iterator().next());
638 readFuture = SettableFuture.create();