a4aaba2b219f94892db890f7da0dbcf50175103b
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / ShardedDOMReadTransactionAdapter.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.Lists;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Optional;
21 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
23 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
28 import org.opendaylight.yangtools.concepts.ListenerRegistration;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 public class ShardedDOMReadTransactionAdapter implements DOMDataTreeReadTransaction {
37
38     private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMReadTransactionAdapter.class.getName());
39
40     private final List<ListenerRegistration<DOMDataTreeListener>> registrations = Lists.newArrayList();
41     private final DOMDataTreeService service;
42     private final Object txIdentifier;
43
44     private boolean finished = false;
45
46     ShardedDOMReadTransactionAdapter(final Object identifier, final DOMDataTreeService service) {
47         this.service = Preconditions.checkNotNull(service);
48         this.txIdentifier = Preconditions.checkNotNull(identifier);
49     }
50
51     @Override
52     public void close() {
53         // TODO should we also cancel all read futures?
54         LOG.debug("{}: Closing read transaction", txIdentifier);
55         if (finished) {
56             return;
57         }
58
59         registrations.forEach(ListenerRegistration::close);
60         finished = true;
61     }
62
63     @Override
64     public Object getIdentifier() {
65         return txIdentifier;
66     }
67
68     @Override
69     public FluentFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
70             final YangInstanceIdentifier path) {
71         checkRunning();
72         LOG.debug("{}: Invoking read at {}:{}", txIdentifier, store, path);
73         final ListenerRegistration<DOMDataTreeListener> reg;
74         final SettableFuture<Optional<NormalizedNode<?, ?>>> initialDataTreeChangeFuture = SettableFuture.create();
75         try {
76             reg = service.registerListener(new ReadShardedListener(initialDataTreeChangeFuture),
77                     Collections.singleton(new DOMDataTreeIdentifier(store, path)), false, Collections.emptyList());
78             registrations.add(reg);
79         } catch (final DOMDataTreeLoopException e) {
80             // This should not happen, we are not specifying any
81             // producers when registering listener
82             throw new IllegalStateException("Loop in listener and producers detected", e);
83         }
84
85         // After data tree change future is finished, we can close the listener registration
86         initialDataTreeChangeFuture.addCallback(new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
87             @Override
88             public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
89                 reg.close();
90             }
91
92             @Override
93             public void onFailure(final Throwable throwable) {
94                 reg.close();
95             }
96         }, MoreExecutors.directExecutor());
97
98         return initialDataTreeChangeFuture;
99     }
100
101     @Override
102     public FluentFuture<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
103         checkRunning();
104         LOG.debug("{}: Invoking exists at {}:{}", txIdentifier, store, path);
105         return read(store, path).transform(Optional::isPresent, MoreExecutors.directExecutor());
106     }
107
108     private void checkRunning() {
109         Preconditions.checkState(!finished, "Transaction is already closed");
110     }
111
112     static class ReadShardedListener implements DOMDataTreeListener {
113
114         private final SettableFuture<Optional<NormalizedNode<?, ?>>> readResultFuture;
115
116         ReadShardedListener(final SettableFuture<Optional<NormalizedNode<?, ?>>> future) {
117             this.readResultFuture = Preconditions.checkNotNull(future);
118         }
119
120         @Override
121         public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
122                 final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
123             Preconditions.checkState(changes.size() == 1 && subtrees.size() == 1,
124                     "DOMDataTreeListener registered exactly on one subtree");
125
126             for (final DataTreeCandidate change : changes) {
127                 if (change.getRootNode().getModificationType().equals(ModificationType.UNMODIFIED)) {
128                     readResultFuture.set(Optional.empty());
129                     return;
130                 }
131             }
132
133             for (final NormalizedNode<?, ?> initialState : subtrees.values()) {
134                 readResultFuture.set(Optional.of(initialState));
135             }
136         }
137
138         @Override
139         public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
140             // TODO If we get just one exception, we don't need to do
141             // chaining
142
143             // We chain all exceptions and return aggregated one
144             readResultFuture.setException(new DOMDataTreeListeningException("Aggregated DOMDataTreeListening exception",
145                     causes.stream().reduce((e1, e2) -> {
146                         e1.addSuppressed(e2);
147                         return e1;
148                     }).get()));
149         }
150     }
151 }