1 package org.opendaylight.controller.sal.binding.impl
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener
5 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
6 import org.opendaylight.yangtools.yang.binding.DataObject
7 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
8 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction
9 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction.DataTransactionListener
10 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
11 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
12 import org.opendaylight.controller.md.sal.common.api.data.DataReader
13 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
14 import org.opendaylight.yangtools.concepts.ListenerRegistration
15 import static extension org.opendaylight.controller.sal.binding.impl.util.MapUtils.*;
16 import java.util.Collection
17 import java.util.Map.Entry
18 import java.util.HashSet
20 import com.google.common.collect.Multimap
21 import static com.google.common.base.Preconditions.*;
23 import java.util.LinkedList
24 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider
25 import com.google.common.collect.HashMultimap
26 import java.util.concurrent.ExecutorService
27 import java.util.concurrent.Callable
28 import org.opendaylight.yangtools.yang.common.RpcResult
29 import org.opendaylight.controller.sal.common.util.Rpcs
30 import java.util.Collections
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
32 import java.util.ArrayList
33 import org.opendaylight.controller.sal.common.util.RpcErrors
35 class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderService {
38 var ExecutorService executor;
40 Multimap<InstanceIdentifier, DataReaderRegistration> configReaders = HashMultimap.create();
41 Multimap<InstanceIdentifier, DataReaderRegistration> operationalReaders = HashMultimap.create();
42 Multimap<InstanceIdentifier, DataChangeListenerRegistration> listeners = HashMultimap.create();
43 Multimap<InstanceIdentifier, DataCommitHandlerRegistration> commitHandlers = HashMultimap.create();
45 override beginTransaction() {
46 return new DataTransactionImpl(this);
49 override readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
50 val readers = configReaders.getAllChildren(path);
51 return readers.readConfiguration(path);
54 override readOperationalData(InstanceIdentifier<? extends DataObject> path) {
55 val readers = operationalReaders.getAllChildren(path);
56 return readers.readOperational(path);
59 override registerCommitHandler(InstanceIdentifier<? extends DataObject> path,
60 DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
61 val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
62 commitHandlers.put(path,registration)
66 override registerDataChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener listener) {
67 val reg = new DataChangeListenerRegistration(path, listener, this);
68 listeners.put(path, reg);
72 override registerDataReader(InstanceIdentifier<? extends DataObject> path,
73 DataReader<InstanceIdentifier<? extends DataObject>, DataObject> provider) {
74 val ret = new DataReaderRegistration(provider, this);
76 configReaders.put(path, ret);
77 operationalReaders.put(path, ret);
81 protected def removeReader(DataReaderRegistration reader) {
82 for (path : reader.paths) {
83 operationalReaders.remove(path, reader);
84 configReaders.remove(path, reader);
88 protected def removeListener(DataChangeListenerRegistration registration) {
89 listeners.remove(registration.path, registration);
92 protected def removeCommitHandler(DataCommitHandlerRegistration registration) {
93 commitHandlers.remove(registration.path, registration);
96 protected def DataObject readConfiguration(
97 Collection<Entry<? extends InstanceIdentifier, ? extends DataReaderRegistration>> entries,
98 InstanceIdentifier<? extends DataObject> path) {
100 val List<DataObject> partialResults = new LinkedList();
101 for (entry : entries) {
102 partialResults.add(entry.value.instance.readConfigurationData(path))
104 return merge(path, partialResults);
107 protected def DataObject readOperational(
108 Collection<Entry<? extends InstanceIdentifier, ? extends DataReaderRegistration>> entries,
109 InstanceIdentifier<? extends DataObject> path) {
111 val List<DataObject> partialResults = new LinkedList();
112 for (entry : entries) {
113 partialResults.add(entry.value.instance.readOperationalData(path))
115 return merge(path, partialResults);
118 protected def DataObject merge(InstanceIdentifier<? extends DataObject> identifier, List<DataObject> objects) {
120 // FIXME: implement real merge
121 if (objects.size > 0) {
122 return objects.get(0);
126 protected def getActiveCommitHandlers() {
128 return commitHandlers.entries.map[ value.instance].toSet
131 protected def commit(DataTransactionImpl transaction) {
132 checkNotNull(transaction);
133 transaction.changeStatus(TransactionStatus.SUBMITED);
134 val task = new TwoPhaseCommit(transaction, this);
135 return executor.submit(task);
140 package class DataReaderRegistration extends //
141 AbstractObjectRegistration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> {
143 DataBrokerImpl dataBroker;
146 val Set<InstanceIdentifier<? extends DataObject>> paths;
148 new(DataReader<InstanceIdentifier<? extends DataObject>, DataObject> instance, DataBrokerImpl broker) {
151 _paths = new HashSet();
154 override protected removeRegistration() {
155 dataBroker.removeReader(this);
160 package class DataChangeListenerRegistration extends AbstractObjectRegistration<DataChangeListener> implements ListenerRegistration<DataChangeListener> {
162 DataBrokerImpl dataBroker;
165 val InstanceIdentifier<?> path;
167 new(InstanceIdentifier<?> path, DataChangeListener instance, DataBrokerImpl broker) {
173 override protected removeRegistration() {
174 dataBroker.removeListener(this);
180 package class DataCommitHandlerRegistration //
181 extends AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
183 DataBrokerImpl dataBroker;
186 val InstanceIdentifier<?> path;
188 new(InstanceIdentifier<?> path, DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> instance,
189 DataBrokerImpl broker) {
195 override protected removeRegistration() {
196 dataBroker.removeCommitHandler(this);
202 package class TwoPhaseCommit implements Callable<RpcResult<TransactionStatus>> {
204 val DataTransactionImpl transaction;
205 val DataBrokerImpl dataBroker;
207 new(DataTransactionImpl transaction, DataBrokerImpl broker) {
208 this.transaction = transaction;
209 this.dataBroker = broker;
212 override call() throws Exception {
214 val Iterable<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlers = dataBroker.activeCommitHandlers;
216 // requesting commits
217 val List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> handlerTransactions = new ArrayList();
219 for (handler : commitHandlers) {
220 handlerTransactions.add(handler.requestCommit(transaction));
222 } catch (Exception e) {
223 return rollback(handlerTransactions,e);
225 val List<RpcResult<Void>> results = new ArrayList();
227 for (subtransaction : handlerTransactions) {
228 results.add(subtransaction.finish());
230 } catch (Exception e) {
231 return rollback(handlerTransactions,e);
234 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
237 def rollback(List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> transactions,Exception e) {
238 for (transaction : transactions) {
239 transaction.rollback()
241 // FIXME return encoutered error.
242 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());