2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.common.impl.service
\r
10 import com.google.common.collect.FluentIterable
\r
11 import com.google.common.collect.HashMultimap
\r
12 import com.google.common.collect.ImmutableList
\r
13 import com.google.common.collect.Multimap
\r
14 import java.util.ArrayList
\r
15 import java.util.Arrays
\r
16 import java.util.Collection
\r
17 import java.util.Collections
\r
18 import java.util.HashSet
\r
19 import java.util.List
\r
20 import java.util.Set
\r
21 import java.util.concurrent.Callable
\r
22 import java.util.concurrent.ExecutorService
\r
23 import java.util.concurrent.Future
\r
24 import java.util.concurrent.atomic.AtomicLong
\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader
\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
\r
37 import org.opendaylight.controller.sal.common.util.Rpcs
\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
41 import org.opendaylight.yangtools.concepts.Path
\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
43 import org.opendaylight.yangtools.yang.common.RpcResult
\r
44 import org.slf4j.LoggerFactory
\r
46 import static com.google.common.base.Preconditions.*
\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
47 import com.google.common.collect.Multimaps
48 import java.util.concurrent.locks.Lock
49 import java.util.concurrent.locks.ReentrantLock
51 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
\r
52 DataReader<P, D>, //
\r
53 DataChangePublisher<P, D, DCL>, //
\r
54 DataProvisionService<P, D> {
\r
56 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
\r
59 var ExecutorService executor;
\r
62 var AbstractDataReadRouter<P, D> dataReadRouter;
\r
65 private val AtomicLong submittedTransactionsCount = new AtomicLong;
\r
68 private val AtomicLong failedTransactionsCount = new AtomicLong
\r
71 private val AtomicLong finishedTransactionsCount = new AtomicLong
\r
73 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());
\r
74 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());
\r
76 private val Lock registrationLock = new ReentrantLock;
78 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
\r
82 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
\r
84 return withLock(registrationLock) [|
\r
85 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
\r
86 .transformAndConcat[value] //
\r
87 .transform[instance].toList()
91 override final readConfigurationData(P path) {
\r
92 return dataReadRouter.readConfigurationData(path);
\r
95 override final readOperationalData(P path) {
\r
96 return dataReadRouter.readOperationalData(path);
\r
99 private static def <T> withLock(Lock lock,Callable<T> method) {
108 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
109 return withLock(registrationLock) [|
\r
110 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
\r
111 commitHandlers.put(path, registration)
\r
112 LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
\r
113 for(listener : commitHandlerRegistrationListeners) {
\r
115 listener.instance.onRegister(registration);
\r
116 } catch (Exception e) {
\r
117 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
\r
124 override final def registerDataChangeListener(P path, DCL listener) {
\r
125 return withLock(registrationLock) [|
126 val reg = new DataChangeListenerRegistration(path, listener, this);
\r
127 listeners.put(path, reg);
\r
128 val initialConfig = dataReadRouter.readConfigurationData(path);
\r
129 val initialOperational = dataReadRouter.readOperationalData(path);
\r
130 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
\r
131 listener.onDataChanged(event);
\r
136 final def registerDataReader(P path, DataReader<P, D> reader) {
\r
137 return withLock(registrationLock) [|
\r
138 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
\r
139 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
\r
141 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
145 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
\r
146 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
\r
150 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
\r
151 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
\r
155 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
156 return withLock(registrationLock) [|
\r
157 listeners.remove(registration.path, registration);
161 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
\r
162 return withLock(registrationLock) [|
163 commitHandlers.remove(registration.path, registration);
\r
164 LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
\r
165 for(listener : commitHandlerRegistrationListeners) {
\r
167 listener.instance.onUnregister(registration);
\r
168 } catch (Exception e) {
\r
169 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
\r
176 protected final def getActiveCommitHandlers() {
\r
177 return commitHandlers.entries;
\r
180 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
\r
182 return withLock(registrationLock) [|
\r
183 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
\r
184 val operationalState = readOperationalData(key)
\r
185 val configurationState = readConfigurationData(key)
\r
186 return new ListenerStateCapture(key, value, operationalState, configurationState)
\r
191 protected def boolean isAffectedBy(P key, Set<P> paths) {
\r
192 if (paths.contains(key)) {
\r
195 for (path : paths) {
\r
196 if (key.contains(path)) {
\r
204 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
\r
205 checkNotNull(transaction);
\r
206 transaction.changeStatus(TransactionStatus.SUBMITED);
\r
207 val task = new TwoPhaseCommit(transaction, this);
\r
208 submittedTransactionsCount.andIncrement;
\r
209 return executor.submit(task);
\r
215 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
\r
221 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
\r
224 D initialOperationalState;
\r
227 D initialConfigurationState;
\r
230 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
\r
232 AbstractDataBroker<P, D, DCL> dataBroker;
\r
237 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
\r
239 dataBroker = broker;
\r
243 override protected removeRegistration() {
\r
244 dataBroker.removeListener(this);
\r
250 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
\r
251 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
\r
252 implements DataCommitHandlerRegistration<P, D> {
\r
254 AbstractDataBroker<P, D, ?> dataBroker;
\r
259 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
\r
261 dataBroker = broker;
\r
265 override protected removeRegistration() {
\r
266 dataBroker.removeCommitHandler(this);
\r
271 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
\r
273 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
\r
275 val AbstractDataTransaction<P, D> transaction;
\r
276 val AbstractDataBroker<P, D, DCL> dataBroker;
\r
278 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
\r
279 this.transaction = transaction;
\r
280 this.dataBroker = broker;
\r
283 override call() throws Exception {
\r
285 // get affected paths
\r
286 val affectedPaths = new HashSet<P>();
\r
288 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
\r
289 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
\r
290 affectedPaths.addAll(transaction.removedConfigurationData);
\r
292 affectedPaths.addAll(transaction.createdOperationalData.keySet);
\r
293 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
\r
294 affectedPaths.addAll(transaction.removedOperationalData);
\r
296 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
\r
298 val transactionId = transaction.identifier;
\r
300 log.trace("Transaction: {} Started.",transactionId);
\r
301 log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
302 // requesting commits
\r
303 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
\r
304 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
\r
306 for (handler : commitHandlers) {
\r
307 handlerTransactions.add(handler.requestCommit(transaction));
\r
309 } catch (Exception e) {
\r
310 log.error("Transaction: {} Request Commit failed", transactionId,e);
\r
311 dataBroker.failedTransactionsCount.andIncrement
\r
312 transaction.changeStatus(TransactionStatus.FAILED)
313 return rollback(handlerTransactions, e);
\r
315 val List<RpcResult<Void>> results = new ArrayList();
\r
317 for (subtransaction : handlerTransactions) {
\r
318 results.add(subtransaction.finish());
\r
320 listeners.publishDataChangeEvent();
\r
321 } catch (Exception e) {
\r
322 log.error("Transaction: {} Finish Commit failed",transactionId, e);
\r
323 dataBroker.failedTransactionsCount.andIncrement
324 transaction.changeStatus(TransactionStatus.FAILED)
\r
325 return rollback(handlerTransactions, e);
\r
327 log.trace("Transaction: {} Finished successfully.",transactionId);
\r
328 dataBroker.finishedTransactionsCount.andIncrement;
329 transaction.changeStatus(TransactionStatus.COMMITED)
\r
330 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
\r
334 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
\r
335 dataBroker.executor.submit [|
\r
336 for (listenerSet : listeners) {
337 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
338 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
340 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
341 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
342 for (listener : listenerSet.listeners) {
344 listener.instance.onDataChanged(changeEvent);
346 } catch (Exception e) {
354 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
\r
355 for (transaction : transactions) {
\r
356 transaction.rollback()
\r
359 // FIXME return encountered error.
\r
360 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
\r
364 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
\r
366 private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
369 private val Object identifier;
\r
371 var TransactionStatus status;
\r
373 var AbstractDataBroker<P, D, ?> broker;
\r
375 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
\r
377 _identifier = identifier;
\r
378 broker = dataBroker;
\r
379 status = TransactionStatus.NEW;
\r
380 LOG.debug("Transaction {} Allocated.", identifier);
382 //listeners = new ListenerRegistry<>();
\r
385 override commit() {
\r
386 return broker.commit(this);
\r
389 override readConfigurationData(P path) {
\r
390 val local = this.updatedConfigurationData.get(path);
\r
391 if(local != null) {
\r
395 return broker.readConfigurationData(path);
\r
398 override readOperationalData(P path) {
\r
399 val local = this.updatedOperationalData.get(path);
\r
400 if(local != null) {
\r
403 return broker.readOperationalData(path);
\r
406 override hashCode() {
\r
407 return identifier.hashCode;
\r
410 override equals(Object obj) {
\r
415 if (getClass() != obj.getClass())
\r
417 val other = (obj as AbstractDataTransaction<P,D>);
\r
418 if (broker == null) {
\r
419 if (other.broker != null)
\r
421 } else if (!broker.equals(other.broker))
\r
423 if (identifier == null) {
\r
424 if (other.identifier != null)
\r
426 } else if (!identifier.equals(other.identifier))
\r
431 override TransactionStatus getStatus() {
\r
435 protected abstract def void onStatusChange(TransactionStatus status);
\r
437 public def changeStatus(TransactionStatus status) {
\r
438 LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
439 this.status = status;
\r
440 onStatusChange(status);
\r