Fix infinite loop on cancel transaction
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / ShardedDOMReadTransactionAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.dom.broker;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import javax.annotation.Nullable;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.common.api.ReadFailedException;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public class ShardedDOMReadTransactionAdapter implements DOMDataTreeReadTransaction {
43
44     private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMReadTransactionAdapter.class.getName());
45
46     private final List<ListenerRegistration<DOMDataTreeListener>> registrations = Lists.newArrayList();
47     private final DOMDataTreeService service;
48     private final Object txIdentifier;
49
50     private boolean finished = false;
51
52     ShardedDOMReadTransactionAdapter(final Object identifier, final DOMDataTreeService service) {
53         this.service = Preconditions.checkNotNull(service);
54         this.txIdentifier = Preconditions.checkNotNull(identifier);
55     }
56
57     @Override
58     public void close() {
59         // TODO should we also cancel all read futures?
60         LOG.debug("{}: Closing read transaction", txIdentifier);
61         if (finished) {
62             return;
63         }
64
65         registrations.forEach(ListenerRegistration::close);
66         finished = true;
67     }
68
69     @Override
70     public Object getIdentifier() {
71         return txIdentifier;
72     }
73
74     @Override
75     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
76             final YangInstanceIdentifier path) {
77         checkRunning();
78         LOG.debug("{}: Invoking read at {}:{}", txIdentifier, store, path);
79         final ListenerRegistration<DOMDataTreeListener> reg;
80         final SettableFuture<Optional<NormalizedNode<?, ?>>> initialDataTreeChangeFuture = SettableFuture.create();
81         try {
82             reg = service.registerListener(new ReadShardedListener(initialDataTreeChangeFuture),
83                     Collections.singleton(new DOMDataTreeIdentifier(store, path)), false, Collections.emptyList());
84             registrations.add(reg);
85         } catch (final DOMDataTreeLoopException e) {
86             // This should not happen, we are not specifying any
87             // producers when registering listener
88             throw new IllegalStateException("Loop in listener and producers detected", e);
89         }
90
91         // After data tree change future is finished, we can close the listener registration
92         Futures.addCallback(initialDataTreeChangeFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
93             @Override
94             public void onSuccess(@Nullable final Optional<NormalizedNode<?, ?>> result) {
95                 reg.close();
96             }
97
98             @Override
99             public void onFailure(final Throwable throwable) {
100                 reg.close();
101             }
102         }, MoreExecutors.directExecutor());
103
104         return Futures.makeChecked(initialDataTreeChangeFuture, ReadFailedException.MAPPER);
105     }
106
107     @Override
108     public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
109             final YangInstanceIdentifier path) {
110         checkRunning();
111         LOG.debug("{}: Invoking exists at {}:{}", txIdentifier, store, path);
112         final Function<Optional<NormalizedNode<?, ?>>, Boolean> transform =
113             optionalNode -> optionalNode.isPresent() ? Boolean.TRUE : Boolean.FALSE;
114         final ListenableFuture<Boolean> existsResult = Futures.transform(read(store, path), transform,
115             MoreExecutors.directExecutor());
116         return Futures.makeChecked(existsResult, ReadFailedException.MAPPER);
117     }
118
119     private void checkRunning() {
120         Preconditions.checkState(!finished, "Transaction is already closed");
121     }
122
123     static class ReadShardedListener implements DOMDataTreeListener {
124
125         private final SettableFuture<Optional<NormalizedNode<?, ?>>> readResultFuture;
126
127         ReadShardedListener(final SettableFuture<Optional<NormalizedNode<?, ?>>> future) {
128             this.readResultFuture = Preconditions.checkNotNull(future);
129         }
130
131         @Override
132         public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
133                 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
134             Preconditions.checkState(changes.size() == 1 && subtrees.size() == 1,
135                     "DOMDataTreeListener registered exactly on one subtree");
136
137             for (final DataTreeCandidate change : changes) {
138                 if (change.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) {
139                     readResultFuture.set(Optional.absent());
140                     return;
141                 }
142             }
143
144             for (final NormalizedNode<?, ?> initialState : subtrees.values()) {
145                 readResultFuture.set(Optional.of(initialState));
146             }
147         }
148
149         @Override
150         public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
151             // TODO If we get just one exception, we don't need to do
152             // chaining
153
154             // We chain all exceptions and return aggregated one
155             readResultFuture.setException(new DOMDataTreeListeningException("Aggregated DOMDataTreeListening exception",
156                     causes.stream().reduce((e1, e2) -> {
157                         e1.addSuppressed(e2);
158                         return e1;
159                     }).get()));
160         }
161     }
162 }