2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.dom.broker;
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.List;
24 import javax.annotation.Nullable;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.common.api.ReadFailedException;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 public class ShardedDOMReadTransactionAdapter implements DOMDataTreeReadTransaction {
43 private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMReadTransactionAdapter.class.getName());
45 private final List<ListenerRegistration<DOMDataTreeListener>> registrations = Lists.newArrayList();
46 private final DOMDataTreeService service;
47 private final Object txIdentifier;
49 private boolean finished = false;
51 ShardedDOMReadTransactionAdapter(final Object identifier, final DOMDataTreeService service) {
52 this.service = Preconditions.checkNotNull(service);
53 this.txIdentifier = Preconditions.checkNotNull(identifier);
58 // TODO should we also cancel all read futures?
59 LOG.debug("{}: Closing read transaction", txIdentifier);
60 if (finished == true) {
64 registrations.forEach(ListenerRegistration::close);
69 public Object getIdentifier() {
74 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
75 final YangInstanceIdentifier path) {
77 LOG.debug("{}: Invoking read at {}:{}", txIdentifier, store, path);
78 final ListenerRegistration<DOMDataTreeListener> reg;
79 final SettableFuture<Optional<NormalizedNode<?, ?>>> initialDataTreeChangeFuture = SettableFuture.create();
81 reg = service.registerListener(new ReadShardedListener(initialDataTreeChangeFuture),
82 Collections.singleton(new DOMDataTreeIdentifier(store, path)), false, Collections.emptyList());
83 registrations.add(reg);
84 } catch (final DOMDataTreeLoopException e) {
85 // This should not happen, we are not specifying any
86 // producers when registering listener
87 throw new IllegalStateException("Loop in listener and producers detected", e);
90 // After data tree change future is finished, we can close the listener registration
91 Futures.addCallback(initialDataTreeChangeFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
93 public void onSuccess(@Nullable final Optional<NormalizedNode<?, ?>> result) {
98 public void onFailure(final Throwable throwable) {
103 return Futures.makeChecked(initialDataTreeChangeFuture, ReadFailedException.MAPPER);
107 public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
108 final YangInstanceIdentifier path) {
110 LOG.debug("{}: Invoking exists at {}:{}", txIdentifier, store, path);
111 final Function<Optional<NormalizedNode<?, ?>>, Boolean> transform =
112 optionalNode -> optionalNode.isPresent() ? Boolean.TRUE : Boolean.FALSE;
113 final ListenableFuture<Boolean> existsResult = Futures.transform(read(store, path), transform);
114 return Futures.makeChecked(existsResult, ReadFailedException.MAPPER);
117 private void checkRunning() {
118 Preconditions.checkState(finished == false, "Transaction is already closed");
121 static class ReadShardedListener implements DOMDataTreeListener {
123 private final SettableFuture<Optional<NormalizedNode<?, ?>>> readResultFuture;
125 ReadShardedListener(final SettableFuture<Optional<NormalizedNode<?, ?>>> future) {
126 this.readResultFuture = Preconditions.checkNotNull(future);
130 public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
131 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
132 Preconditions.checkState(changes.size() == 1 && subtrees.size() == 1,
133 "DOMDataTreeListener registered exactly on one subtree");
135 for (final DataTreeCandidate change : changes) {
136 if (change.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) {
137 readResultFuture.set(Optional.absent());
142 for (final NormalizedNode initialState : subtrees.values()) {
143 readResultFuture.set(Optional.of(initialState));
148 public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
149 // TODO If we get just one exception, we don't need to do
152 // We chain all exceptions and return aggregated one
153 readResultFuture.setException(new DOMDataTreeListeningException("Aggregated DOMDataTreeListening exception",
154 causes.stream().reduce((e1, e2) -> {
155 e1.addSuppressed(e2);