2 * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.clustering.it.provider;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.SettableFuture;
15 import java.io.PrintWriter;
16 import java.io.StringWriter;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.HashMap;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Future;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
25 import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
26 import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
27 import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
28 import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
29 import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
30 import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
31 import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
32 import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
33 import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler;
34 import org.opendaylight.controller.clustering.it.provider.impl.YnlListener;
35 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
36 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
37 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
38 import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
39 import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
41 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
42 import org.opendaylight.controller.sal.core.api.model.SchemaService;
43 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
44 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
50 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
51 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
52 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput;
53 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
54 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput;
55 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput;
56 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder;
57 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
58 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput;
59 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService;
60 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
61 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
62 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput;
63 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput;
64 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput;
65 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput;
66 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
67 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput;
68 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput;
69 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput;
70 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput;
71 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput;
72 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder;
73 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
74 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
75 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
76 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput;
77 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput;
78 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput;
79 import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput;
80 import org.opendaylight.yangtools.concepts.ListenerRegistration;
81 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
82 import org.opendaylight.yangtools.yang.common.RpcError;
83 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
84 import org.opendaylight.yangtools.yang.common.RpcResult;
85 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
86 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
87 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
88 import org.slf4j.Logger;
89 import org.slf4j.LoggerFactory;
91 public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService {
93 private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class);
95 private final RpcProviderRegistry rpcRegistry;
96 private final BindingAwareBroker.RpcRegistration<OdlMdsalLowlevelControlService> registration;
97 private final DistributedShardFactory distributedShardFactory;
98 private final DOMDataTreeService domDataTreeService;
99 private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
100 private final DOMDataBroker domDataBroker;
101 private final NotificationPublishService notificationPublishService;
102 private final NotificationService notificationService;
103 private final SchemaService schemaService;
104 private final ClusterSingletonServiceProvider singletonService;
105 private final DOMRpcProviderService domRpcService;
106 private final PrefixShardHandler prefixShardHandler;
108 private Map<InstanceIdentifier<?>, DOMRpcImplementationRegistration<RoutedGetConstantService>> routedRegistrations =
111 private Map<String, ListenerRegistration<YnlListener>> ynlRegistrations = new HashMap<>();
113 private DOMRpcImplementationRegistration<GetConstantService> globalGetConstantRegistration = null;
114 private ClusterSingletonServiceRegistration getSingletonConstantRegistration;
115 private FlappingSingletonService flappingSingletonService;
116 private Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
117 private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
118 private IdIntsDOMDataTreeLIstener idIntsDdtl;
120 public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry,
121 final DOMRpcProviderService domRpcService,
122 final ClusterSingletonServiceProvider singletonService,
123 final SchemaService schemaService,
124 final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer,
125 final NotificationPublishService notificationPublishService,
126 final NotificationService notificationService,
127 final DOMDataBroker domDataBroker,
128 final DOMDataTreeService domDataTreeService,
129 final DistributedShardFactory distributedShardFactory) {
130 this.rpcRegistry = rpcRegistry;
131 this.domRpcService = domRpcService;
132 this.singletonService = singletonService;
133 this.schemaService = schemaService;
134 this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
135 this.notificationPublishService = notificationPublishService;
136 this.notificationService = notificationService;
137 this.domDataBroker = domDataBroker;
138 this.domDataTreeService = domDataTreeService;
139 this.distributedShardFactory = distributedShardFactory;
141 registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this);
143 prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer);
147 public Future<RpcResult<Void>> unregisterSingletonConstant() {
148 LOG.debug("unregister-singleton-constant");
150 if (getSingletonConstantRegistration == null) {
151 LOG.debug("No get-singleton-constant registration present.");
152 final RpcError rpcError = RpcResultBuilder
153 .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present.");
154 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
155 return Futures.immediateFuture(result);
159 getSingletonConstantRegistration.close();
160 getSingletonConstantRegistration = null;
162 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
163 } catch (final Exception e) {
164 LOG.debug("There was a problem closing the singleton constant service", e);
165 final RpcError rpcError = RpcResultBuilder
166 .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant");
167 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
168 return Futures.immediateFuture(result);
173 public Future<RpcResult<Void>> startPublishNotifications(final StartPublishNotificationsInput input) {
174 LOG.debug("publish-notifications, input: {}", input);
176 final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(),
177 input.getSeconds(), input.getNotificationsPerSecond());
179 publishNotificationsTasks.put(input.getId(), task);
183 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
187 public Future<RpcResult<Void>> subscribeDtcl() {
192 public Future<RpcResult<WriteTransactionsOutput>> writeTransactions(final WriteTransactionsInput input) {
193 LOG.debug("write-transactions, input: {}", input);
195 final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input);
197 final SettableFuture<RpcResult<WriteTransactionsOutput>> settableFuture = SettableFuture.create();
198 writeTransactionsHandler.start(settableFuture);
200 return settableFuture;
204 public Future<RpcResult<IsClientAbortedOutput>> isClientAborted() {
209 public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
214 public Future<RpcResult<Void>> subscribeYnl(final SubscribeYnlInput input) {
216 LOG.debug("subscribe-ynl, input: {}", input);
218 if (ynlRegistrations.containsKey(input.getId())) {
219 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
220 "There is already ynl listener registered for this id: " + input.getId());
221 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
224 ynlRegistrations.put(input.getId(),
225 notificationService.registerNotificationListener(new YnlListener(input.getId())));
227 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
231 public Future<RpcResult<Void>> removePrefixShard(final RemovePrefixShardInput input) {
232 LOG.debug("remove-prefix-shard, input: {}", input);
234 return prefixShardHandler.onRemovePrefixShard(input);
238 public Future<RpcResult<Void>> becomePrefixLeader(final BecomePrefixLeaderInput input) {
243 public Future<RpcResult<Void>> unregisterBoundConstant(final UnregisterBoundConstantInput input) {
244 LOG.debug("unregister-bound-constant, {}", input);
246 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
247 routedRegistrations.remove(input.getContext());
249 if (registration == null) {
250 LOG.debug("No get-contexted-constant registration for context: {}", input.getContext());
251 final RpcError rpcError = RpcResultBuilder
252 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
253 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
254 return Futures.immediateFuture(result);
257 registration.close();
258 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
262 public Future<RpcResult<Void>> registerSingletonConstant(final RegisterSingletonConstantInput input) {
264 LOG.debug("Received register-singleton-constant rpc, input: {}", input);
266 if (input.getConstant() == null) {
267 final RpcError error = RpcResultBuilder.newError(
268 ErrorType.RPC, "Invalid input.", "Constant value is null");
269 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
272 getSingletonConstantRegistration =
273 SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant());
275 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
279 public Future<RpcResult<Void>> registerDefaultConstant(RegisterDefaultConstantInput input) {
284 public Future<RpcResult<Void>> unregisterConstant() {
286 if (globalGetConstantRegistration == null) {
287 final RpcError rpcError = RpcResultBuilder
288 .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present.");
289 final RpcResult<Void> result = RpcResultBuilder.<Void>failed().withRpcError(rpcError).build();
290 return Futures.immediateFuture(result);
293 globalGetConstantRegistration.close();
294 globalGetConstantRegistration = null;
296 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
300 public Future<RpcResult<UnregisterFlappingSingletonOutput>> unregisterFlappingSingleton() {
301 LOG.debug("unregister-flapping-singleton received.");
303 if (flappingSingletonService == null) {
304 final RpcError rpcError = RpcResultBuilder
305 .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present.");
306 final RpcResult<UnregisterFlappingSingletonOutput> result =
307 RpcResultBuilder.<UnregisterFlappingSingletonOutput>failed().withRpcError(rpcError).build();
308 return Futures.immediateFuture(result);
311 final long flapCount = flappingSingletonService.setInactive();
312 flappingSingletonService = null;
314 final UnregisterFlappingSingletonOutput output =
315 new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build();
317 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
321 public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
326 public Future<RpcResult<Void>> subscribeDdtl() {
328 if (ddtlReg != null) {
329 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
330 "There is already dataTreeChangeListener registered on id-ints list.");
331 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
334 idIntsDdtl = new IdIntsDOMDataTreeLIstener();
338 domDataTreeService.registerListener(idIntsDdtl,
339 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
340 ProduceTransactionsHandler.ID_INTS_YID))
341 , true, Collections.emptyList());
342 } catch (DOMDataTreeLoopException e) {
343 LOG.error("Failed to register DOMDataTreeListener.", e);
347 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
351 public Future<RpcResult<Void>> registerBoundConstant(final RegisterBoundConstantInput input) {
352 LOG.debug("register-bound-constant: {}", input);
354 if (input.getContext() == null) {
355 final RpcError error = RpcResultBuilder.newError(
356 ErrorType.RPC, "Invalid input.", "Context value is null");
357 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
360 if (input.getConstant() == null) {
361 final RpcError error = RpcResultBuilder.newError(
362 ErrorType.RPC, "Invalid input.", "Constant value is null");
363 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
366 if (routedRegistrations.containsKey(input.getContext())) {
367 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
368 "There is already a rpc registered for context: " + input.getContext());
369 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
372 final DOMRpcImplementationRegistration<RoutedGetConstantService> registration =
373 RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService,
374 input.getConstant(), input.getContext());
376 routedRegistrations.put(input.getContext(), registration);
377 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
381 public Future<RpcResult<Void>> registerFlappingSingleton() {
382 LOG.debug("Received register-flapping-singleton.");
384 if (flappingSingletonService != null) {
385 final RpcError error = RpcResultBuilder.newError(
386 ErrorType.RPC, "Registration present.", "flappin-singleton already registered");
387 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
390 flappingSingletonService = new FlappingSingletonService(singletonService);
392 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
396 public Future<RpcResult<UnsubscribeDtclOutput>> unsubscribeDtcl() {
401 public Future<RpcResult<Void>> createPrefixShard(final CreatePrefixShardInput input) {
402 LOG.debug("create-prefix-shard, input: {}", input);
404 return prefixShardHandler.onCreatePrefixShard(input);
408 public Future<RpcResult<Void>> deconfigureIdIntsShard() {
413 public Future<RpcResult<UnsubscribeYnlOutput>> unsubscribeYnl(final UnsubscribeYnlInput input) {
414 LOG.debug("Received unsubscribe-ynl, input: {}", input);
416 if (!ynlRegistrations.containsKey(input.getId())) {
417 final RpcError rpcError = RpcResultBuilder
418 .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered.");
419 final RpcResult<UnsubscribeYnlOutput> result =
420 RpcResultBuilder.<UnsubscribeYnlOutput>failed().withRpcError(rpcError).build();
421 return Futures.immediateFuture(result);
424 final ListenerRegistration<YnlListener> registration = ynlRegistrations.remove(input.getId());
425 final UnsubscribeYnlOutput output = registration.getInstance().getOutput();
427 registration.close();
429 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeYnlOutput>success().withResult(output).build());
433 public Future<RpcResult<CheckPublishNotificationsOutput>> checkPublishNotifications(
434 final CheckPublishNotificationsInput input) {
436 final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId());
439 return Futures.immediateFuture(RpcResultBuilder.success(
440 new CheckPublishNotificationsOutputBuilder().setActive(false)).build());
443 final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder =
444 new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished());
446 if (task.getLastError() != null) {
447 final StringWriter sw = new StringWriter();
448 final PrintWriter pw = new PrintWriter(sw);
449 task.getLastError().printStackTrace(pw);
450 checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString());
453 final CheckPublishNotificationsOutput output =
454 checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build();
456 return Futures.immediateFuture(RpcResultBuilder.success(output).build());
460 public Future<RpcResult<ProduceTransactionsOutput>> produceTransactions(final ProduceTransactionsInput input) {
461 LOG.debug("producer-transactions, input: {}", input);
463 final ProduceTransactionsHandler handler =
464 new ProduceTransactionsHandler(domDataTreeService, input);
466 final SettableFuture<RpcResult<ProduceTransactionsOutput>> settableFuture = SettableFuture.create();
467 handler.start(settableFuture);
469 return settableFuture;
473 public Future<RpcResult<Void>> registerConstant(final RegisterConstantInput input) {
475 LOG.debug("Received register-constant rpc, input: {}", input);
477 if (input.getConstant() == null) {
478 final RpcError error = RpcResultBuilder.newError(
479 ErrorType.RPC, "Invalid input.", "Constant value is null");
480 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
483 if (globalGetConstantRegistration != null) {
484 final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.",
485 "There is already a get-constant rpc registered.");
486 return Futures.immediateFuture(RpcResultBuilder.<Void>failed().withRpcError(error).build());
489 globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant());
490 return Futures.immediateFuture(RpcResultBuilder.<Void>success().build());
494 public Future<RpcResult<Void>> unregisterDefaultConstant() {
499 public Future<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl() {
500 LOG.debug("Received unsubscribe-ddtl.");
502 if (idIntsDdtl == null || ddtlReg == null) {
503 final RpcError error = RpcResultBuilder.newError(
504 ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered.");
505 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withRpcError(error).build());
511 final ReadListener readListener = new ReadListener();
513 final ListenerRegistration<ReadListener> registration = domDataTreeService.registerListener(readListener,
514 Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
515 ProduceTransactionsHandler.ID_INTS_YID))
516 , true, Collections.emptyList());
518 final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get();
519 registration.close();
521 if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) {
522 final RpcError error = RpcResultBuilder.newError(
523 ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list.");
524 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
525 .withRpcError(error).build());
528 final NormalizedNode<?, ?> lastRead = dataTreeCandidate.getRootNode().getDataAfter().get();
530 return Futures.immediateFuture(
531 RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder()
532 .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build());
535 } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) {
536 LOG.error("Unable to read data to verify ddtl data.", e);
537 final RpcError error = RpcResultBuilder.newError(
538 ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed.");
539 return Futures.immediateFuture(RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
540 .withRpcError(error).build());
544 private static class ReadListener implements DOMDataTreeListener {
546 private Collection<DataTreeCandidate> changes = null;
547 private SettableFuture<DataTreeCandidate> readFuture;
550 public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes,
551 @Nonnull final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
552 Preconditions.checkArgument(changes.size() == 1);
554 if (this.changes == null) {
555 this.changes = changes;
557 readFuture.set(changes.iterator().next());
562 public void onDataTreeFailed(@Nonnull final Collection<DOMDataTreeListeningException> causes) {
563 LOG.error("Read Listener failed. {}", causes);
566 public synchronized ListenableFuture<DataTreeCandidate> getFirstNotif() {
567 if (changes != null) {
568 return Futures.immediateFuture(changes.iterator().next());
571 readFuture = SettableFuture.create();