1 package org.opendaylight.controller.md.sal.common.impl.service
3 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
4 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
5 import org.opendaylight.controller.md.sal.common.api.data.DataReader
6 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
7 import org.opendaylight.yangtools.concepts.ListenerRegistration
8 import com.google.common.collect.Multimap
9 import static com.google.common.base.Preconditions.*;
11 import com.google.common.collect.HashMultimap
12 import java.util.concurrent.ExecutorService
13 import java.util.concurrent.Callable
14 import org.opendaylight.yangtools.yang.common.RpcResult
15 import java.util.Collections
16 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
17 import java.util.ArrayList
18 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
19 import java.util.Arrays
20 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
21 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
22 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
23 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
24 import org.opendaylight.controller.sal.common.util.Rpcs
25 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
26 import java.util.concurrent.Future
27 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
28 import org.opendaylight.yangtools.concepts.Path
29 import org.slf4j.LoggerFactory
30 import java.util.HashSet
31 import java.util.Map.Entry
32 import java.util.Iterator
33 import java.util.Collection
34 import com.google.common.collect.FluentIterable;
36 import com.google.common.collect.ImmutableList
37 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
38 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
39 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
40 import java.util.concurrent.atomic.AtomicLong
42 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
44 DataChangePublisher<P, D, DCL>, //
45 DataProvisionService<P, D> {
47 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
50 var ExecutorService executor;
53 var AbstractDataReadRouter<P, D> dataReadRouter;
55 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
56 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
58 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
62 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
64 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
65 .transformAndConcat[value] //
66 .transform[instance].toList()
69 override final readConfigurationData(P path) {
70 return dataReadRouter.readConfigurationData(path);
73 override final readOperationalData(P path) {
74 return dataReadRouter.readOperationalData(path);
77 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
78 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
79 commitHandlers.put(path, registration)
80 LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
81 for(listener : commitHandlerRegistrationListeners) {
83 listener.instance.onRegister(registration);
84 } catch (Exception e) {
85 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
91 override final def registerDataChangeListener(P path, DCL listener) {
92 val reg = new DataChangeListenerRegistration(path, listener, this);
93 listeners.put(path, reg);
97 final def registerDataReader(P path, DataReader<P, D> reader) {
99 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
100 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
102 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
105 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
106 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
112 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
113 listeners.remove(registration.path, registration);
116 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
117 commitHandlers.remove(registration.path, registration);
119 LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
120 for(listener : commitHandlerRegistrationListeners) {
122 listener.instance.onUnregister(registration);
123 } catch (Exception e) {
124 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
129 protected final def getActiveCommitHandlers() {
130 return commitHandlers.entries;
133 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
135 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
136 val operationalState = readOperationalData(key)
137 val configurationState = readConfigurationData(key)
138 return new ListenerStateCapture(key, value, operationalState, configurationState)
142 protected def boolean isAffectedBy(P key, Set<P> paths) {
143 if (paths.contains(key)) {
147 if (key.contains(path)) {
155 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
156 checkNotNull(transaction);
157 transaction.changeStatus(TransactionStatus.SUBMITED);
158 val task = new TwoPhaseCommit(transaction, this);
159 return executor.submit(task);
165 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
171 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
174 D initialOperationalState;
177 D initialConfigurationState;
180 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
182 AbstractDataBroker<P, D, DCL> dataBroker;
187 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
193 override protected removeRegistration() {
194 dataBroker.removeListener(this);
200 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
201 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
202 implements DataCommitHandlerRegistration<P, D> {
204 AbstractDataBroker<P, D, ?> dataBroker;
209 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
215 override protected removeRegistration() {
216 dataBroker.removeCommitHandler(this);
221 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
223 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
225 val AbstractDataTransaction<P, D> transaction;
226 val AbstractDataBroker<P, D, DCL> dataBroker;
228 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
229 this.transaction = transaction;
230 this.dataBroker = broker;
233 override call() throws Exception {
235 // get affected paths
236 val affectedPaths = new HashSet<P>();
238 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
239 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
240 affectedPaths.addAll(transaction.removedConfigurationData);
242 affectedPaths.addAll(transaction.createdOperationalData.keySet);
243 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
244 affectedPaths.addAll(transaction.removedOperationalData);
246 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
248 val transactionId = transaction.identifier;
250 log.info("Transaction: {} Started.",transactionId);
251 // requesting commits
252 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
253 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
255 for (handler : commitHandlers) {
256 handlerTransactions.add(handler.requestCommit(transaction));
258 } catch (Exception e) {
259 log.error("Transaction: {} Request Commit failed", transactionId,e);
260 return rollback(handlerTransactions, e);
262 val List<RpcResult<Void>> results = new ArrayList();
264 for (subtransaction : handlerTransactions) {
265 results.add(subtransaction.finish());
267 listeners.publishDataChangeEvent();
268 } catch (Exception e) {
269 log.error("Transaction: {} Finish Commit failed",transactionId, e);
270 return rollback(handlerTransactions, e);
272 log.info("Transaction: {} Finished succesfully.",transactionId);
273 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
277 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
278 for (listenerSet : listeners) {
279 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
280 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
282 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
283 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
284 for (listener : listenerSet.listeners) {
286 listener.instance.onDataChanged(changeEvent);
288 } catch (Exception e) {
295 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
296 for (transaction : transactions) {
297 transaction.rollback()
300 // FIXME return encountered error.
301 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
305 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
308 private val Object identifier;
310 var TransactionStatus status;
312 var AbstractDataBroker<P, D, ?> broker;
314 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
316 _identifier = identifier;
318 status = TransactionStatus.NEW;
320 //listeners = new ListenerRegistry<>();
324 return broker.commit(this);
327 override readConfigurationData(P path) {
328 return broker.readConfigurationData(path);
331 override readOperationalData(P path) {
332 return broker.readOperationalData(path);
335 override hashCode() {
336 return identifier.hashCode;
339 override equals(Object obj) {
344 if (getClass() != obj.getClass())
346 val other = (obj as AbstractDataTransaction<P,D>);
347 if (broker == null) {
348 if (other.broker != null)
350 } else if (!broker.equals(other.broker))
352 if (identifier == null) {
353 if (other.identifier != null)
355 } else if (!identifier.equals(other.identifier))
360 override TransactionStatus getStatus() {
364 protected abstract def void onStatusChange(TransactionStatus status);
366 public def changeStatus(TransactionStatus status) {
367 this.status = status;
368 onStatusChange(status);