Merge "Bug:129 Connection Manager Dashlet"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / DataBrokerImpl.xtend
1 package org.opendaylight.controller.sal.binding.impl
2
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener
5 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
6 import org.opendaylight.yangtools.yang.binding.DataObject
7 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
8 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
9 import org.opendaylight.controller.md.sal.common.api.data.DataReader
10 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
11 import org.opendaylight.yangtools.concepts.ListenerRegistration
12 import com.google.common.collect.Multimap
13 import static com.google.common.base.Preconditions.*;
14 import java.util.List
15 import com.google.common.collect.HashMultimap
16 import java.util.concurrent.ExecutorService
17 import java.util.concurrent.Callable
18 import org.opendaylight.yangtools.yang.common.RpcResult
19 import org.opendaylight.controller.sal.common.util.Rpcs
20 import java.util.Collections
21 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
22 import java.util.ArrayList
23 import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter
24 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
25 import java.util.Arrays
26
27 class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderService {
28
29     @Property
30     var ExecutorService executor;
31
32     val dataReadRouter = new BindingAwareDataReaderRouter;
33
34     Multimap<InstanceIdentifier, DataChangeListenerRegistration> listeners = HashMultimap.create();
35     Multimap<InstanceIdentifier, DataCommitHandlerRegistration> commitHandlers = HashMultimap.create();
36
37     override beginTransaction() {
38         return new DataTransactionImpl(this);
39     }
40
41     override readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
42         return dataReadRouter.readConfigurationData(path);
43     }
44
45     override readOperationalData(InstanceIdentifier<? extends DataObject> path) {
46         return dataReadRouter.readOperationalData(path);
47     }
48
49     override registerCommitHandler(InstanceIdentifier<? extends DataObject> path,
50         DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> commitHandler) {
51             val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
52             commitHandlers.put(path,registration)
53             return registration;
54     }
55
56     override registerDataChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener listener) {
57         val reg = new DataChangeListenerRegistration(path, listener, this);
58         listeners.put(path, reg);
59         return reg;
60     }
61
62     override registerDataReader(InstanceIdentifier<? extends DataObject> path,DataReader<InstanceIdentifier<? extends DataObject>,DataObject> reader) {
63         
64         val confReg = dataReadRouter.registerConfigurationReader(path,reader);
65         val dataReg = dataReadRouter.registerOperationalReader(path,reader);
66         
67         return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
68     }
69
70     protected def removeListener(DataChangeListenerRegistration registration) {
71         listeners.remove(registration.path, registration);
72     }
73
74     protected def removeCommitHandler(DataCommitHandlerRegistration registration) {
75         commitHandlers.remove(registration.path, registration);
76     }
77     
78     protected def getActiveCommitHandlers() {
79         return commitHandlers.entries.map[ value.instance].toSet
80     }
81
82     protected def commit(DataTransactionImpl transaction) {
83         checkNotNull(transaction);
84         transaction.changeStatus(TransactionStatus.SUBMITED);
85         val task = new TwoPhaseCommit(transaction, this);
86         return executor.submit(task);
87     }
88
89 }
90
91 package class DataChangeListenerRegistration extends AbstractObjectRegistration<DataChangeListener> implements ListenerRegistration<DataChangeListener> {
92
93     DataBrokerImpl dataBroker;
94
95     @Property
96     val InstanceIdentifier<?> path;
97
98     new(InstanceIdentifier<?> path, DataChangeListener instance, DataBrokerImpl broker) {
99         super(instance)
100         dataBroker = broker;
101         _path = path;
102     }
103
104     override protected removeRegistration() {
105         dataBroker.removeListener(this);
106         dataBroker = null;
107     }
108
109 }
110
111 package class DataCommitHandlerRegistration //
112 extends AbstractObjectRegistration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> {
113
114     DataBrokerImpl dataBroker;
115
116     @Property
117     val InstanceIdentifier<?> path;
118
119     new(InstanceIdentifier<?> path, DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject> instance,
120         DataBrokerImpl broker) {
121         super(instance)
122         dataBroker = broker;
123         _path = path;
124     }
125
126     override protected removeRegistration() {
127         dataBroker.removeCommitHandler(this);
128         dataBroker = null;
129     }
130
131 }
132
133 package class TwoPhaseCommit implements Callable<RpcResult<TransactionStatus>> {
134
135     val DataTransactionImpl transaction;
136     val DataBrokerImpl dataBroker;
137
138     new(DataTransactionImpl transaction, DataBrokerImpl broker) {
139         this.transaction = transaction;
140         this.dataBroker = broker;
141     }
142
143     override call() throws Exception {
144
145         val Iterable<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> commitHandlers = dataBroker.activeCommitHandlers;
146
147         // requesting commits
148         val List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> handlerTransactions = new ArrayList();
149         try {
150             for (handler : commitHandlers) {
151                 handlerTransactions.add(handler.requestCommit(transaction));
152             }
153         } catch (Exception e) {
154             return rollback(handlerTransactions,e);
155         }
156         val List<RpcResult<Void>> results = new ArrayList();
157         try {
158             for (subtransaction : handlerTransactions) {
159                 results.add(subtransaction.finish());
160             }
161         } catch (Exception e) {
162             return rollback(handlerTransactions,e);
163         }
164
165         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
166     }
167
168     def rollback(List<DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject>> transactions,Exception e) {
169         for (transaction : transactions) {
170             transaction.rollback()
171         }
172         // FIXME return encoutered error.
173         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
174     }
175 }