1 package org.opendaylight.controller.md.sal.common.impl.service
\r
3 import com.google.common.collect.FluentIterable
\r
4 import com.google.common.collect.HashMultimap
\r
5 import com.google.common.collect.ImmutableList
\r
6 import com.google.common.collect.Multimap
\r
7 import java.util.ArrayList
\r
8 import java.util.Arrays
\r
9 import java.util.Collection
\r
10 import java.util.Collections
\r
11 import java.util.HashSet
\r
12 import java.util.List
\r
13 import java.util.Set
\r
14 import java.util.concurrent.Callable
\r
15 import java.util.concurrent.ExecutorService
\r
16 import java.util.concurrent.Future
\r
17 import java.util.concurrent.atomic.AtomicLong
\r
18 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
\r
19 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
\r
20 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
\r
21 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
\r
22 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
\r
23 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
\r
24 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
\r
25 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
\r
26 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataReader
\r
28 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
\r
29 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
\r
30 import org.opendaylight.controller.sal.common.util.Rpcs
\r
31 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
32 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
\r
33 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
34 import org.opendaylight.yangtools.concepts.Path
\r
35 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
36 import org.opendaylight.yangtools.yang.common.RpcResult
\r
37 import org.slf4j.LoggerFactory
\r
39 import static com.google.common.base.Preconditions.*
\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
41 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
\r
42 DataReader<P, D>, //
\r
43 DataChangePublisher<P, D, DCL>, //
\r
44 DataProvisionService<P, D> {
\r
46 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
\r
49 var ExecutorService executor;
\r
52 var AbstractDataReadRouter<P, D> dataReadRouter;
\r
55 private val AtomicLong submittedTransactionsCount = new AtomicLong;
\r
58 private val AtomicLong failedTransactionsCount = new AtomicLong
\r
61 private val AtomicLong finishedTransactionsCount = new AtomicLong
\r
63 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
\r
64 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
\r
66 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
\r
70 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
\r
72 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
\r
73 .transformAndConcat[value] //
\r
74 .transform[instance].toList()
\r
77 override final readConfigurationData(P path) {
\r
78 return dataReadRouter.readConfigurationData(path);
\r
81 override final readOperationalData(P path) {
\r
82 return dataReadRouter.readOperationalData(path);
\r
85 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
\r
86 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
\r
87 commitHandlers.put(path, registration)
\r
88 LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
\r
89 for(listener : commitHandlerRegistrationListeners) {
\r
91 listener.instance.onRegister(registration);
\r
92 } catch (Exception e) {
\r
93 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
\r
96 return registration;
\r
99 override final def registerDataChangeListener(P path, DCL listener) {
\r
100 val reg = new DataChangeListenerRegistration(path, listener, this);
\r
101 listeners.put(path, reg);
\r
102 val initialConfig = dataReadRouter.readConfigurationData(path);
\r
103 val initialOperational = dataReadRouter.readOperationalData(path);
\r
104 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
\r
105 listener.onDataChanged(event);
\r
109 final def registerDataReader(P path, DataReader<P, D> reader) {
\r
111 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
\r
112 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
\r
114 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
\r
117 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
\r
118 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
\r
123 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
\r
124 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
\r
128 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
\r
129 listeners.remove(registration.path, registration);
\r
132 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
\r
133 commitHandlers.remove(registration.path, registration);
\r
135 LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
\r
136 for(listener : commitHandlerRegistrationListeners) {
\r
138 listener.instance.onUnregister(registration);
\r
139 } catch (Exception e) {
\r
140 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
\r
145 protected final def getActiveCommitHandlers() {
\r
146 return commitHandlers.entries;
\r
149 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
\r
150 HashSet<P> paths) {
\r
151 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
\r
152 val operationalState = readOperationalData(key)
\r
153 val configurationState = readConfigurationData(key)
\r
154 return new ListenerStateCapture(key, value, operationalState, configurationState)
\r
158 protected def boolean isAffectedBy(P key, Set<P> paths) {
\r
159 if (paths.contains(key)) {
\r
162 for (path : paths) {
\r
163 if (key.contains(path)) {
\r
171 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
\r
172 checkNotNull(transaction);
\r
173 transaction.changeStatus(TransactionStatus.SUBMITED);
\r
174 val task = new TwoPhaseCommit(transaction, this);
\r
175 submittedTransactionsCount.andIncrement;
\r
176 return executor.submit(task);
\r
182 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
\r
188 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
\r
191 D initialOperationalState;
\r
194 D initialConfigurationState;
\r
197 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
\r
199 AbstractDataBroker<P, D, DCL> dataBroker;
\r
204 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
\r
206 dataBroker = broker;
\r
210 override protected removeRegistration() {
\r
211 dataBroker.removeListener(this);
\r
217 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
\r
218 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
\r
219 implements DataCommitHandlerRegistration<P, D> {
\r
221 AbstractDataBroker<P, D, ?> dataBroker;
\r
226 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
\r
228 dataBroker = broker;
\r
232 override protected removeRegistration() {
\r
233 dataBroker.removeCommitHandler(this);
\r
238 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
\r
240 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
\r
242 val AbstractDataTransaction<P, D> transaction;
\r
243 val AbstractDataBroker<P, D, DCL> dataBroker;
\r
245 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
\r
246 this.transaction = transaction;
\r
247 this.dataBroker = broker;
\r
250 override call() throws Exception {
\r
252 // get affected paths
\r
253 val affectedPaths = new HashSet<P>();
\r
255 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
\r
256 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
\r
257 affectedPaths.addAll(transaction.removedConfigurationData);
\r
259 affectedPaths.addAll(transaction.createdOperationalData.keySet);
\r
260 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
\r
261 affectedPaths.addAll(transaction.removedOperationalData);
\r
263 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
\r
265 val transactionId = transaction.identifier;
\r
267 log.info("Transaction: {} Started.",transactionId);
\r
268 // requesting commits
\r
269 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
\r
270 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
\r
272 for (handler : commitHandlers) {
\r
273 handlerTransactions.add(handler.requestCommit(transaction));
\r
275 } catch (Exception e) {
\r
276 log.error("Transaction: {} Request Commit failed", transactionId,e);
\r
277 dataBroker.failedTransactionsCount.andIncrement
\r
278 return rollback(handlerTransactions, e);
\r
280 val List<RpcResult<Void>> results = new ArrayList();
\r
282 for (subtransaction : handlerTransactions) {
\r
283 results.add(subtransaction.finish());
\r
285 listeners.publishDataChangeEvent();
\r
286 } catch (Exception e) {
\r
287 log.error("Transaction: {} Finish Commit failed",transactionId, e);
\r
288 dataBroker.failedTransactionsCount.andIncrement
\r
289 return rollback(handlerTransactions, e);
\r
291 log.info("Transaction: {} Finished successfully.",transactionId);
\r
292 dataBroker.finishedTransactionsCount.andIncrement;
\r
293 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
\r
297 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
\r
298 for (listenerSet : listeners) {
\r
299 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
\r
300 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
\r
302 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
\r
303 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
\r
304 for (listener : listenerSet.listeners) {
\r
306 listener.instance.onDataChanged(changeEvent);
\r
308 } catch (Exception e) {
\r
309 e.printStackTrace();
\r
315 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
\r
316 for (transaction : transactions) {
\r
317 transaction.rollback()
\r
320 // FIXME return encountered error.
\r
321 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
\r
325 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
\r
328 private val Object identifier;
\r
330 var TransactionStatus status;
\r
332 var AbstractDataBroker<P, D, ?> broker;
\r
334 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
\r
336 _identifier = identifier;
\r
337 broker = dataBroker;
\r
338 status = TransactionStatus.NEW;
\r
340 //listeners = new ListenerRegistry<>();
\r
343 override commit() {
\r
344 return broker.commit(this);
\r
347 override readConfigurationData(P path) {
\r
348 val local = this.updatedConfigurationData.get(path);
\r
349 if(local != null) {
\r
353 return broker.readConfigurationData(path);
\r
356 override readOperationalData(P path) {
\r
357 val local = this.updatedOperationalData.get(path);
\r
358 if(local != null) {
\r
361 return broker.readOperationalData(path);
\r
364 override hashCode() {
\r
365 return identifier.hashCode;
\r
368 override equals(Object obj) {
\r
373 if (getClass() != obj.getClass())
\r
375 val other = (obj as AbstractDataTransaction<P,D>);
\r
376 if (broker == null) {
\r
377 if (other.broker != null)
\r
379 } else if (!broker.equals(other.broker))
\r
381 if (identifier == null) {
\r
382 if (other.identifier != null)
\r
384 } else if (!identifier.equals(other.identifier))
\r
389 override TransactionStatus getStatus() {
\r
393 protected abstract def void onStatusChange(TransactionStatus status);
\r
395 public def changeStatus(TransactionStatus status) {
\r
396 this.status = status;
\r
397 onStatusChange(status);
\r