2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.Collections;
19 import java.util.List;
21 import java.util.Optional;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 public class ShardedDOMReadTransactionAdapter implements DOMDataTreeReadTransaction {
39 private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMReadTransactionAdapter.class.getName());
41 private final List<ListenerRegistration<DOMDataTreeListener>> registrations = new ArrayList<>();
42 private final @NonNull DOMDataTreeService service;
43 private final @NonNull Object txIdentifier;
45 private boolean finished = false;
47 ShardedDOMReadTransactionAdapter(final Object identifier, final DOMDataTreeService service) {
48 this.service = requireNonNull(service);
49 this.txIdentifier = requireNonNull(identifier);
54 LOG.debug("{}: Closing read transaction", txIdentifier);
59 registrations.forEach(ListenerRegistration::close);
60 // TODO should we also cancel all read futures?
65 public Object getIdentifier() {
70 public FluentFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
71 final YangInstanceIdentifier path) {
73 LOG.debug("{}: Invoking read at {}:{}", txIdentifier, store, path);
74 final ListenerRegistration<DOMDataTreeListener> reg;
75 final SettableFuture<Optional<NormalizedNode<?, ?>>> initialDataTreeChangeFuture = SettableFuture.create();
77 reg = service.registerListener(new ReadShardedListener(initialDataTreeChangeFuture),
78 Collections.singleton(new DOMDataTreeIdentifier(store, path)), false, Collections.emptyList());
79 registrations.add(reg);
80 } catch (final DOMDataTreeLoopException e) {
81 // This should not happen, we are not specifying any producers when registering listener
82 throw new IllegalStateException("Loop in listener and producers detected", e);
85 // After data tree change future is finished, we can close the listener registration
86 initialDataTreeChangeFuture.addListener(reg::close, MoreExecutors.directExecutor());
87 return initialDataTreeChangeFuture;
91 public FluentFuture<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
93 LOG.debug("{}: Invoking exists at {}:{}", txIdentifier, store, path);
94 return read(store, path).transform(Optional::isPresent, MoreExecutors.directExecutor());
97 private void checkRunning() {
98 checkState(!finished, "Transaction is already closed");
101 static final class ReadShardedListener implements DOMDataTreeListener {
102 private final SettableFuture<Optional<NormalizedNode<?, ?>>> readResultFuture;
104 ReadShardedListener(final SettableFuture<Optional<NormalizedNode<?, ?>>> future) {
105 this.readResultFuture = requireNonNull(future);
109 public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
110 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
111 checkState(changes.size() == 1 && subtrees.size() == 1,
112 "DOMDataTreeListener registered exactly on one subtree");
113 if (changes.iterator().next().getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) {
114 readResultFuture.set(Optional.empty());
116 readResultFuture.set(Optional.of(subtrees.values().iterator().next()));
121 public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
122 // TODO If we get just one exception, we don't need to do chaining
124 // We chain all exceptions and return aggregated one
125 readResultFuture.setException(new DOMDataTreeListeningException("Aggregated DOMDataTreeListening exception",
126 causes.stream().reduce((e1, e2) -> {
127 e1.addSuppressed(e2);