Merge "Unified Two Phase Commit implementation, fixed BA to BI connection"
[controller.git] / opendaylight / md-sal / sal-common-impl / src / main / java / org / opendaylight / controller / md / sal / common / impl / service / AbstractDataBroker.xtend
1 package org.opendaylight.controller.md.sal.common.impl.service
2
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
10 import java.util.List
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30
31 abstract class AbstractDataBroker<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> implements 
32 DataModificationTransactionFactory<P, D>, //
33 DataReader<P, D>, //
34 DataChangePublisher<P, D, DCL>, //
35 DataProvisionService<P,D> {
36
37     @Property
38     var ExecutorService executor;
39
40     @Property
41     var AbstractDataReadRouter<P,D> dataReadRouter;
42
43     Multimap<P, DataChangeListenerRegistration<P,D,DCL>> listeners = HashMultimap.create();
44     Multimap<P, DataCommitHandlerRegistration<P,D>> commitHandlers = HashMultimap.create();
45
46
47     public new() {
48         
49     }
50
51     override final readConfigurationData(P path) {
52         return dataReadRouter.readConfigurationData(path);
53     }
54
55     override final readOperationalData(P path) {
56         return dataReadRouter.readOperationalData(path);
57     }
58
59     override final registerCommitHandler(P path,
60         DataCommitHandler<P, D> commitHandler) {
61             val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
62             commitHandlers.put(path,registration)
63             return registration;
64     }
65
66     override final def registerDataChangeListener(P path, DCL listener) {
67         val reg = new DataChangeListenerRegistration(path, listener, this);
68         listeners.put(path, reg);
69         return reg;
70     }
71
72      final def registerDataReader(P path,DataReader<P,D> reader) {
73         
74         val confReg = dataReadRouter.registerConfigurationReader(path,reader);
75         val dataReg = dataReadRouter.registerOperationalReader(path,reader);
76         
77         return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
78     }
79
80     protected  final def removeListener(DataChangeListenerRegistration<P,D,DCL> registration) {
81         listeners.remove(registration.path, registration);
82     }
83
84     protected  final def removeCommitHandler(DataCommitHandlerRegistration<P,D> registration) {
85         commitHandlers.remove(registration.path, registration);
86     }
87     
88     protected  final def getActiveCommitHandlers() {
89         return commitHandlers.entries.map[ value.instance].toSet
90     }
91
92     package final def Future<RpcResult<TransactionStatus>>  commit(AbstractDataTransaction<P,D> transaction) {
93         checkNotNull(transaction);
94         transaction.changeStatus(TransactionStatus.SUBMITED);
95         val task = new TwoPhaseCommit(transaction, this);
96         return executor.submit(task);
97     }
98
99 }
100
101 package class DataChangeListenerRegistration<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
102
103     AbstractDataBroker<P,D,DCL> dataBroker;
104
105     @Property
106     val P path;
107
108     new(P path, DCL instance, AbstractDataBroker<P,D,DCL> broker) {
109         super(instance)
110         dataBroker = broker;
111         _path = path;
112     }
113
114     override protected removeRegistration() {
115         dataBroker.removeListener(this);
116         dataBroker = null;
117     }
118
119 }
120
121 package class DataCommitHandlerRegistration<P extends Path<P>,D>
122 extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
123
124     AbstractDataBroker<P,D,?> dataBroker;
125
126     @Property
127     val P path;
128
129     new(P path, DataCommitHandler<P, D> instance,
130         AbstractDataBroker<P,D,?> broker) {
131         super(instance)
132         dataBroker = broker;
133         _path = path;
134     }
135
136     override protected removeRegistration() {
137         dataBroker.removeCommitHandler(this);
138         dataBroker = null;
139     }
140
141 }
142
143 package class TwoPhaseCommit<P extends Path<P>,D> implements Callable<RpcResult<TransactionStatus>> {
144     
145     private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
146
147     val AbstractDataTransaction<P,D> transaction;
148     val AbstractDataBroker<P,D,?> dataBroker;
149
150     new(AbstractDataTransaction<P,D> transaction, AbstractDataBroker<P,D,?> broker) {
151         this.transaction = transaction;
152         this.dataBroker = broker;
153     }
154
155     override call() throws Exception {
156
157         val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.activeCommitHandlers;
158
159         // requesting commits
160         val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
161         try {
162             for (handler : commitHandlers) {
163                 handlerTransactions.add(handler.requestCommit(transaction));
164             }
165         } catch (Exception e) {
166             log.error("Request Commit failded",e);
167             return rollback(handlerTransactions,e);
168         }
169         val List<RpcResult<Void>> results = new ArrayList();
170         try {
171             for (subtransaction : handlerTransactions) {
172                 results.add(subtransaction.finish());
173             }
174         } catch (Exception e) {
175             log.error("Finish Commit failed",e);
176             return rollback(handlerTransactions,e);
177         }
178
179         return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
180     }
181
182     def rollback(List<DataCommitTransaction<P, D>> transactions,Exception e) {
183         for (transaction : transactions) {
184             transaction.rollback()
185         }
186         // FIXME return encountered error.
187         return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
188     }
189 }
190
191 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
192
193     @Property
194     private val Object identifier;
195
196     
197     var TransactionStatus status;
198     
199     
200     var AbstractDataBroker<P, D, ?> broker;
201
202     protected new (AbstractDataBroker<P,D,?> dataBroker) {
203         super(dataBroker);
204         _identifier = new Object();
205         broker = dataBroker;
206         status = TransactionStatus.NEW;
207         //listeners = new ListenerRegistry<>();
208     }
209
210     override  commit() {
211         return broker.commit(this);
212     }
213
214     override readConfigurationData(P path) {
215         return broker.readConfigurationData(path);
216     }
217
218     override readOperationalData(P path) {
219         return broker.readOperationalData(path);
220     }
221
222     override hashCode() {
223         return identifier.hashCode;
224     }
225
226     override equals(Object obj) {
227         if (this === obj)
228             return true;
229         if (obj == null)
230             return false;
231         if (getClass() != obj.getClass())
232             return false;
233         val other = (obj as AbstractDataTransaction<P,D>) ;
234         if (broker == null) {
235             if (other.broker != null)
236                 return false;
237         } else if (!broker.equals(other.broker))
238             return false;
239         if (identifier == null) {
240             if (other.identifier != null)
241                 return false;
242         } else if (!identifier.equals(other.identifier))
243             return false;
244         return true;
245     }
246
247     override TransactionStatus getStatus() {
248         return status;
249     }
250
251     
252     protected abstract def void onStatusChange(TransactionStatus status);
253     
254     public def changeStatus(TransactionStatus status) {
255         this.status = status;
256         onStatusChange(status);
257     }
258     
259 }