1 package org.opendaylight.controller.md.sal.common.impl.service
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30 import java.util.HashSet
31 import java.util.Collection
32 import com.google.common.collect.FluentIterable;
34 import com.google.common.collect.ImmutableList
35 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
36 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
37 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
38 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
40 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
42 DataChangePublisher<P, D, DCL>, //
43 DataProvisionService<P, D> {
45 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
48 var ExecutorService executor;
51 var AbstractDataReadRouter<P, D> dataReadRouter;
53 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
54 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
56 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
60 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
62 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
63 .transformAndConcat[value] //
64 .transform[instance].toList()
67 override final readConfigurationData(P path) {
68 return dataReadRouter.readConfigurationData(path);
71 override final readOperationalData(P path) {
72 return dataReadRouter.readOperationalData(path);
75 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
76 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
77 commitHandlers.put(path, registration)
78 LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
79 for(listener : commitHandlerRegistrationListeners) {
81 listener.instance.onRegister(registration);
82 } catch (Exception e) {
83 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
89 override final def registerDataChangeListener(P path, DCL listener) {
90 val reg = new DataChangeListenerRegistration(path, listener, this);
91 listeners.put(path, reg);
92 val initialConfig = dataReadRouter.readConfigurationData(path);
93 val initialOperational = dataReadRouter.readOperationalData(path);
94 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
95 listener.onDataChanged(event);
99 final def registerDataReader(P path, DataReader<P, D> reader) {
101 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
102 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
104 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
107 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
108 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
113 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
114 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
118 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
119 listeners.remove(registration.path, registration);
122 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
123 commitHandlers.remove(registration.path, registration);
125 LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
126 for(listener : commitHandlerRegistrationListeners) {
128 listener.instance.onUnregister(registration);
129 } catch (Exception e) {
130 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
135 protected final def getActiveCommitHandlers() {
136 return commitHandlers.entries;
139 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
141 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
142 val operationalState = readOperationalData(key)
143 val configurationState = readConfigurationData(key)
144 return new ListenerStateCapture(key, value, operationalState, configurationState)
148 protected def boolean isAffectedBy(P key, Set<P> paths) {
149 if (paths.contains(key)) {
153 if (key.contains(path)) {
161 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
162 checkNotNull(transaction);
163 transaction.changeStatus(TransactionStatus.SUBMITED);
164 val task = new TwoPhaseCommit(transaction, this);
165 return executor.submit(task);
171 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
177 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
180 D initialOperationalState;
183 D initialConfigurationState;
186 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
188 AbstractDataBroker<P, D, DCL> dataBroker;
193 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
199 override protected removeRegistration() {
200 dataBroker.removeListener(this);
206 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
207 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
208 implements DataCommitHandlerRegistration<P, D> {
210 AbstractDataBroker<P, D, ?> dataBroker;
215 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
221 override protected removeRegistration() {
222 dataBroker.removeCommitHandler(this);
227 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
229 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
231 val AbstractDataTransaction<P, D> transaction;
232 val AbstractDataBroker<P, D, DCL> dataBroker;
234 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
235 this.transaction = transaction;
236 this.dataBroker = broker;
239 override call() throws Exception {
241 // get affected paths
242 val affectedPaths = new HashSet<P>();
244 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
245 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
246 affectedPaths.addAll(transaction.removedConfigurationData);
248 affectedPaths.addAll(transaction.createdOperationalData.keySet);
249 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
250 affectedPaths.addAll(transaction.removedOperationalData);
252 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
254 val transactionId = transaction.identifier;
256 log.info("Transaction: {} Started.",transactionId);
257 // requesting commits
258 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
259 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
261 for (handler : commitHandlers) {
262 handlerTransactions.add(handler.requestCommit(transaction));
264 } catch (Exception e) {
265 log.error("Transaction: {} Request Commit failed", transactionId,e);
266 return rollback(handlerTransactions, e);
268 val List<RpcResult<Void>> results = new ArrayList();
270 for (subtransaction : handlerTransactions) {
271 results.add(subtransaction.finish());
273 listeners.publishDataChangeEvent();
274 } catch (Exception e) {
275 log.error("Transaction: {} Finish Commit failed",transactionId, e);
276 return rollback(handlerTransactions, e);
278 log.info("Transaction: {} Finished succesfully.",transactionId);
279 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
283 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
284 for (listenerSet : listeners) {
285 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
286 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
288 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
289 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
290 for (listener : listenerSet.listeners) {
292 listener.instance.onDataChanged(changeEvent);
294 } catch (Exception e) {
301 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
302 for (transaction : transactions) {
303 transaction.rollback()
306 // FIXME return encountered error.
307 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
311 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
314 private val Object identifier;
316 var TransactionStatus status;
318 var AbstractDataBroker<P, D, ?> broker;
320 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
322 _identifier = identifier;
324 status = TransactionStatus.NEW;
326 //listeners = new ListenerRegistry<>();
330 return broker.commit(this);
333 override readConfigurationData(P path) {
334 return broker.readConfigurationData(path);
337 override readOperationalData(P path) {
338 return broker.readOperationalData(path);
341 override hashCode() {
342 return identifier.hashCode;
345 override equals(Object obj) {
350 if (getClass() != obj.getClass())
352 val other = (obj as AbstractDataTransaction<P,D>);
353 if (broker == null) {
354 if (other.broker != null)
356 } else if (!broker.equals(other.broker))
358 if (identifier == null) {
359 if (other.identifier != null)
361 } else if (!identifier.equals(other.identifier))
366 override TransactionStatus getStatus() {
370 protected abstract def void onStatusChange(TransactionStatus status);
372 public def changeStatus(TransactionStatus status) {
373 this.status = status;
374 onStatusChange(status);