Merge "Remove remoterpc dead code."
[controller.git] / opendaylight / md-sal / sal-inmemory-datastore / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
18 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
22 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
23 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
24 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
25 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
26 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
27 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
28 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
35 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import javax.annotation.concurrent.GuardedBy;
46 import java.util.Collections;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.atomic.AtomicLong;
49
50 import static com.google.common.base.Preconditions.checkState;
51
52 /**
53  * In-memory DOM Data Store
54  *
55  * Implementation of {@link DOMStore} which uses {@link DataTree} and other
56  * classes such as {@link SnapshotBackedWriteTransaction}.
57  * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
58  * to implement {@link DOMStore} contract.
59  *
60  */
61 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
62         TransactionReadyPrototype,AutoCloseable {
63     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
64     private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
65     private final ListenerTree listenerTree = ListenerTree.create();
66     private final AtomicLong txCounter = new AtomicLong(0);
67     private final ListeningExecutorService executor;
68
69     private final String name;
70
71     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
72         this.name = Preconditions.checkNotNull(name);
73         this.executor = Preconditions.checkNotNull(executor);
74     }
75
76     @Override
77     public final String getIdentifier() {
78         return name;
79     }
80
81     @Override
82     public DOMStoreReadTransaction newReadOnlyTransaction() {
83         return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot());
84     }
85
86     @Override
87     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
88         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
89     }
90
91     @Override
92     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
93         return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
94     }
95
96     @Override
97     public DOMStoreTransactionChain createTransactionChain() {
98         return new DOMStoreTransactionChainImpl();
99     }
100
101     @Override
102     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
103         dataTree.setSchemaContext(ctx);
104     }
105
106     @Override
107     public void close(){
108         executor.shutdownNow();
109     }
110     @Override
111     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
112             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
113
114         /*
115          * Make sure commit is not occurring right now. Listener has to be
116          * registered and its state capture enqueued at a consistent point.
117          *
118          * FIXME: improve this to read-write lock, such that multiple listener
119          * registrations can occur simultaneously
120          */
121         final DataChangeListenerRegistration<L> reg;
122         synchronized (this) {
123             LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
124
125             reg = listenerTree.registerDataChangeListener(path, listener, scope);
126
127             Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
128             if (currentState.isPresent()) {
129                 final NormalizedNode<?, ?> data = currentState.get();
130
131                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
132                         .setAfter(data) //
133                         .addCreated(path, data) //
134                         .build();
135                 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
136             }
137         }
138
139         return new AbstractListenerRegistration<L>(listener) {
140             @Override
141             protected void removeRegistration() {
142                 synchronized (InMemoryDOMDataStore.this) {
143                     reg.close();
144                 }
145             }
146         };
147     }
148
149     @Override
150     public synchronized DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction writeTx) {
151         LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
152         return new ThreePhaseCommitImpl(writeTx);
153     }
154
155     private Object nextIdentifier() {
156         return name + "-" + txCounter.getAndIncrement();
157     }
158
159     private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
160
161         @GuardedBy("this")
162         private SnapshotBackedWriteTransaction latestOutstandingTx;
163
164         private boolean chainFailed = false;
165
166         private void checkFailed() {
167             Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
168         }
169
170         @Override
171         public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
172             final DataTreeSnapshot snapshot;
173             checkFailed();
174             if (latestOutstandingTx != null) {
175                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
176                 snapshot = latestOutstandingTx.getMutatedView();
177             } else {
178                 snapshot = dataTree.takeSnapshot();
179             }
180             return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
181         }
182
183         @Override
184         public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
185             final DataTreeSnapshot snapshot;
186             checkFailed();
187             if (latestOutstandingTx != null) {
188                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
189                 snapshot = latestOutstandingTx.getMutatedView();
190             } else {
191                 snapshot = dataTree.takeSnapshot();
192             }
193             final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
194                     snapshot, this);
195             latestOutstandingTx = ret;
196             return ret;
197         }
198
199         @Override
200         public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
201             final DataTreeSnapshot snapshot;
202             checkFailed();
203             if (latestOutstandingTx != null) {
204                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
205                 snapshot = latestOutstandingTx.getMutatedView();
206             } else {
207                 snapshot = dataTree.takeSnapshot();
208             }
209             final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
210                     this);
211             latestOutstandingTx = ret;
212             return ret;
213         }
214
215         @Override
216         public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
217             DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
218             return new ChainedTransactionCommitImpl(tx, storeCohort, this);
219         }
220
221         @Override
222         public void close() {
223
224              executor.shutdownNow();
225
226         }
227
228         protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
229                 final Throwable t) {
230             chainFailed = true;
231
232         }
233
234         public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
235             // If commited transaction is latestOutstandingTx we clear
236             // latestOutstandingTx
237             // field in order to base new transactions on Datastore Data Tree
238             // directly.
239             if (transaction.equals(latestOutstandingTx)) {
240                 latestOutstandingTx = null;
241             }
242         }
243
244     }
245
246     private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
247
248         private final SnapshotBackedWriteTransaction transaction;
249         private final DOMStoreThreePhaseCommitCohort delegate;
250
251         private final DOMStoreTransactionChainImpl txChain;
252
253         protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
254                 final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
255             super();
256             this.transaction = transaction;
257             this.delegate = delegate;
258             this.txChain = txChain;
259         }
260
261         @Override
262         public ListenableFuture<Boolean> canCommit() {
263             return delegate.canCommit();
264         }
265
266         @Override
267         public ListenableFuture<Void> preCommit() {
268             return delegate.preCommit();
269         }
270
271         @Override
272         public ListenableFuture<Void> abort() {
273             return delegate.abort();
274         }
275
276         @Override
277         public ListenableFuture<Void> commit() {
278             ListenableFuture<Void> commitFuture = delegate.commit();
279             Futures.addCallback(commitFuture, new FutureCallback<Void>() {
280                 @Override
281                 public void onFailure(final Throwable t) {
282                     txChain.onTransactionFailed(transaction, t);
283                 }
284
285                 @Override
286                 public void onSuccess(final Void result) {
287                     txChain.onTransactionCommited(transaction);
288                 }
289
290             });
291             return commitFuture;
292         }
293
294     }
295
296     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
297
298         private final SnapshotBackedWriteTransaction transaction;
299         private final DataTreeModification modification;
300
301         private ResolveDataChangeEventsTask listenerResolver;
302         private DataTreeCandidate candidate;
303
304         public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
305             this.transaction = writeTransaction;
306             this.modification = transaction.getMutatedView();
307         }
308
309         @Override
310         public ListenableFuture<Boolean> canCommit() {
311             return executor.submit(new Callable<Boolean>() {
312                 @Override
313                 public Boolean call() throws TransactionCommitFailedException {
314                     try {
315                         dataTree.validate(modification);
316                         LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
317                         return true;
318                     } catch (ConflictingModificationAppliedException e) {
319                         LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
320                                 e.getPath());
321                         throw new OptimisticLockFailedException("Optimistic lock failed.",e);
322                     } catch (DataValidationFailedException e) {
323                         LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
324                                 e.getPath(), e);
325                         throw new TransactionCommitFailedException("Data did not pass validation.",e);
326                     }
327                 }
328             });
329         }
330
331         @Override
332         public ListenableFuture<Void> preCommit() {
333             return executor.submit(new Callable<Void>() {
334                 @Override
335                 public Void call() {
336                     candidate = dataTree.prepare(modification);
337                     listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
338                     return null;
339                 }
340             });
341         }
342
343         @Override
344         public ListenableFuture<Void> abort() {
345             candidate = null;
346             return Futures.immediateFuture(null);
347         }
348
349         @Override
350         public ListenableFuture<Void> commit() {
351             checkState(candidate != null, "Proposed subtree must be computed");
352
353             /*
354              * The commit has to occur atomically with regard to listener
355              * registrations.
356              */
357             synchronized (this) {
358                 dataTree.commit(candidate);
359
360                 for (ChangeListenerNotifyTask task : listenerResolver.call()) {
361                     LOG.trace("Scheduling invocation of listeners: {}", task);
362                     executor.submit(task);
363                 }
364             }
365
366             return Futures.immediateFuture(null);
367         }
368     }
369 }