1 package org.opendaylight.controller.md.sal.common.impl.service
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
31 abstract class AbstractDataBroker<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> implements
32 DataModificationTransactionFactory<P, D>, //
34 DataChangePublisher<P, D, DCL>, //
35 DataProvisionService<P,D> {
38 var ExecutorService executor;
41 var AbstractDataReadRouter<P,D> dataReadRouter;
43 Multimap<P, DataChangeListenerRegistration<P,D,DCL>> listeners = HashMultimap.create();
44 Multimap<P, DataCommitHandlerRegistration<P,D>> commitHandlers = HashMultimap.create();
51 override final readConfigurationData(P path) {
52 return dataReadRouter.readConfigurationData(path);
55 override final readOperationalData(P path) {
56 return dataReadRouter.readOperationalData(path);
59 override final registerCommitHandler(P path,
60 DataCommitHandler<P, D> commitHandler) {
61 val registration = new DataCommitHandlerRegistration(path,commitHandler,this);
62 commitHandlers.put(path,registration)
66 override final def registerDataChangeListener(P path, DCL listener) {
67 val reg = new DataChangeListenerRegistration(path, listener, this);
68 listeners.put(path, reg);
72 final def registerDataReader(P path,DataReader<P,D> reader) {
74 val confReg = dataReadRouter.registerConfigurationReader(path,reader);
75 val dataReg = dataReadRouter.registerOperationalReader(path,reader);
77 return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
80 protected final def removeListener(DataChangeListenerRegistration<P,D,DCL> registration) {
81 listeners.remove(registration.path, registration);
84 protected final def removeCommitHandler(DataCommitHandlerRegistration<P,D> registration) {
85 commitHandlers.remove(registration.path, registration);
88 protected final def getActiveCommitHandlers() {
89 return commitHandlers.entries.map[ value.instance].toSet
92 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P,D> transaction) {
93 checkNotNull(transaction);
94 transaction.changeStatus(TransactionStatus.SUBMITED);
95 val task = new TwoPhaseCommit(transaction, this);
96 return executor.submit(task);
101 package class DataChangeListenerRegistration<P extends Path<P>,D,DCL extends DataChangeListener<P,D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
103 AbstractDataBroker<P,D,DCL> dataBroker;
108 new(P path, DCL instance, AbstractDataBroker<P,D,DCL> broker) {
114 override protected removeRegistration() {
115 dataBroker.removeListener(this);
121 package class DataCommitHandlerRegistration<P extends Path<P>,D>
122 extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
124 AbstractDataBroker<P,D,?> dataBroker;
129 new(P path, DataCommitHandler<P, D> instance,
130 AbstractDataBroker<P,D,?> broker) {
136 override protected removeRegistration() {
137 dataBroker.removeCommitHandler(this);
143 package class TwoPhaseCommit<P extends Path<P>,D> implements Callable<RpcResult<TransactionStatus>> {
145 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
147 val AbstractDataTransaction<P,D> transaction;
148 val AbstractDataBroker<P,D,?> dataBroker;
150 new(AbstractDataTransaction<P,D> transaction, AbstractDataBroker<P,D,?> broker) {
151 this.transaction = transaction;
152 this.dataBroker = broker;
155 override call() throws Exception {
157 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.activeCommitHandlers;
159 // requesting commits
160 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
162 for (handler : commitHandlers) {
163 handlerTransactions.add(handler.requestCommit(transaction));
165 } catch (Exception e) {
166 log.error("Request Commit failded",e);
167 return rollback(handlerTransactions,e);
169 val List<RpcResult<Void>> results = new ArrayList();
171 for (subtransaction : handlerTransactions) {
172 results.add(subtransaction.finish());
174 } catch (Exception e) {
175 log.error("Finish Commit failed",e);
176 return rollback(handlerTransactions,e);
179 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
182 def rollback(List<DataCommitTransaction<P, D>> transactions,Exception e) {
183 for (transaction : transactions) {
184 transaction.rollback()
186 // FIXME return encountered error.
187 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
191 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
194 private val Object identifier;
197 var TransactionStatus status;
200 var AbstractDataBroker<P, D, ?> broker;
202 protected new (AbstractDataBroker<P,D,?> dataBroker) {
204 _identifier = new Object();
206 status = TransactionStatus.NEW;
207 //listeners = new ListenerRegistry<>();
211 return broker.commit(this);
214 override readConfigurationData(P path) {
215 return broker.readConfigurationData(path);
218 override readOperationalData(P path) {
219 return broker.readOperationalData(path);
222 override hashCode() {
223 return identifier.hashCode;
226 override equals(Object obj) {
231 if (getClass() != obj.getClass())
233 val other = (obj as AbstractDataTransaction<P,D>) ;
234 if (broker == null) {
235 if (other.broker != null)
237 } else if (!broker.equals(other.broker))
239 if (identifier == null) {
240 if (other.identifier != null)
242 } else if (!identifier.equals(other.identifier))
247 override TransactionStatus getStatus() {
252 protected abstract def void onStatusChange(TransactionStatus status);
254 public def changeStatus(TransactionStatus status) {
255 this.status = status;
256 onStatusChange(status);