1 package org.opendaylight.controller.sal.binding.impl
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener
5 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
6 import org.opendaylight.yangtools.yang.binding.DataObject
7 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
8 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
9 import org.opendaylight.controller.md.sal.common.api.data.DataReader
10 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
11 import org.opendaylight.yangtools.concepts.ListenerRegistration
12 import com.google.common.collect.Multimap
13 import static com.google.common.base.Preconditions.*;
15 import com.google.common.collect.HashMultimap
16 import java.util.concurrent.ExecutorService
17 import java.util.concurrent.Callable
18 import org.opendaylight.yangtools.yang.common.RpcResult
19 import org.opendaylight.controller.sal.common.util.Rpcs
20 import java.util.Collections
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
22 import java.util.ArrayList
23 import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter
24 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
25 import java.util.Arrays
27 class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderService {
30 var ExecutorService executor;
32 val dataReadRouter = new BindingAwareDataReaderRouter;
34 Multimap<InstanceIdentifier, DataChangeListenerRegistration> listeners = HashMultimap.create();
35 Multimap<InstanceIdentifier, DataCommitHandlerRegistration> commitHandlers = HashMultimap.create();
37 override beginTransaction() {
38 return new DataTransactionImpl(this);
41 override readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
42 return dataReadRouter.readConfigurationData(path);
45 override readOperationalData(InstanceIdentifier<? extends DataObject> path) {
46 return dataReadRouter.readOperationalData(path);
49 override registerCommitHandler(InstanceIdentifier<? extends DataObject> path,
50 DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
51 val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
52 commitHandlers.put(path,registration)
56 override registerDataChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener listener) {
57 val reg = new DataChangeListenerRegistration(path, listener, this);
58 listeners.put(path, reg);
62 override registerDataReader(InstanceIdentifier<? extends DataObject> path,DataReader<InstanceIdentifier<? extends DataObject>,DataObject> reader) {
64 val confReg = dataReadRouter.registerConfigurationReader(path,reader);
65 val dataReg = dataReadRouter.registerOperationalReader(path,reader);
67 return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
70 protected def removeListener(DataChangeListenerRegistration registration) {
71 listeners.remove(registration.path, registration);
74 protected def removeCommitHandler(DataCommitHandlerRegistration registration) {
75 commitHandlers.remove(registration.path, registration);
78 protected def getActiveCommitHandlers() {
79 return commitHandlers.entries.map[ value.instance].toSet
82 protected def commit(DataTransactionImpl transaction) {
83 checkNotNull(transaction);
84 transaction.changeStatus(TransactionStatus.SUBMITED);
85 val task = new TwoPhaseCommit(transaction, this);
86 return executor.submit(task);
91 package class DataChangeListenerRegistration extends AbstractObjectRegistration<DataChangeListener> implements ListenerRegistration<DataChangeListener> {
93 DataBrokerImpl dataBroker;
96 val InstanceIdentifier<?> path;
98 new(InstanceIdentifier<?> path, DataChangeListener instance, DataBrokerImpl broker) {
104 override protected removeRegistration() {
105 dataBroker.removeListener(this);
111 package class DataCommitHandlerRegistration //
112 extends AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
114 DataBrokerImpl dataBroker;
117 val InstanceIdentifier<?> path;
119 new(InstanceIdentifier<?> path, DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> instance,
120 DataBrokerImpl broker) {
126 override protected removeRegistration() {
127 dataBroker.removeCommitHandler(this);
133 package class TwoPhaseCommit implements Callable<RpcResult<TransactionStatus>> {
135 val DataTransactionImpl transaction;
136 val DataBrokerImpl dataBroker;
138 new(DataTransactionImpl transaction, DataBrokerImpl broker) {
139 this.transaction = transaction;
140 this.dataBroker = broker;
143 override call() throws Exception {
145 val Iterable<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlers = dataBroker.activeCommitHandlers;
147 // requesting commits
148 val List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> handlerTransactions = new ArrayList();
150 for (handler : commitHandlers) {
151 handlerTransactions.add(handler.requestCommit(transaction));
153 } catch (Exception e) {
154 return rollback(handlerTransactions,e);
156 val List<RpcResult<Void>> results = new ArrayList();
158 for (subtransaction : handlerTransactions) {
159 results.add(subtransaction.finish());
161 } catch (Exception e) {
162 return rollback(handlerTransactions,e);
165 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
168 def rollback(List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> transactions,Exception e) {
169 for (transaction : transactions) {
170 transaction.rollback()
172 // FIXME return encoutered error.
173 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());