2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.common.impl.service
\r
10 import com.google.common.collect.FluentIterable
\r
11 import com.google.common.collect.HashMultimap
\r
12 import com.google.common.collect.ImmutableList
\r
13 import com.google.common.collect.Multimap
\r
14 import java.util.ArrayList
\r
15 import java.util.Arrays
\r
16 import java.util.Collection
\r
17 import java.util.Collections
\r
18 import java.util.HashSet
\r
19 import java.util.List
\r
20 import java.util.Set
\r
21 import java.util.concurrent.Callable
\r
22 import java.util.concurrent.ExecutorService
\r
23 import java.util.concurrent.Future
\r
24 import java.util.concurrent.atomic.AtomicLong
\r
25 import org.opendaylight.controller.md.sal.common.api.RegistrationListener
\r
26 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
\r
27 import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
\r
28 import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
\r
29 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
\r
30 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
\r
31 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
\r
32 import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
\r
33 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
\r
34 import org.opendaylight.controller.md.sal.common.api.data.DataReader
\r
35 import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
\r
36 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
\r
37 import org.opendaylight.controller.sal.common.util.Rpcs
\r
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
39 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
\r
40 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
41 import org.opendaylight.yangtools.concepts.Path
\r
42 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
43 import org.opendaylight.yangtools.yang.common.RpcResult
\r
44 import org.slf4j.LoggerFactory
\r
46 import static com.google.common.base.Preconditions.*
\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
47 import com.google.common.collect.Multimaps
49 abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
\r
50 DataReader<P, D>, //
\r
51 DataChangePublisher<P, D, DCL>, //
\r
52 DataProvisionService<P, D> {
\r
54 private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
\r
57 var ExecutorService executor;
\r
60 var AbstractDataReadRouter<P, D> dataReadRouter;
\r
63 private val AtomicLong submittedTransactionsCount = new AtomicLong;
\r
66 private val AtomicLong failedTransactionsCount = new AtomicLong
\r
69 private val AtomicLong finishedTransactionsCount = new AtomicLong
\r
71 Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());
\r
72 Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());
\r
74 val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
\r
78 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
\r
80 return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
\r
81 .transformAndConcat[value] //
\r
82 .transform[instance].toList()
\r
85 override final readConfigurationData(P path) {
\r
86 return dataReadRouter.readConfigurationData(path);
\r
89 override final readOperationalData(P path) {
\r
90 return dataReadRouter.readOperationalData(path);
\r
93 override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
\r
94 val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
\r
95 commitHandlers.put(path, registration)
\r
96 LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
\r
97 for(listener : commitHandlerRegistrationListeners) {
\r
99 listener.instance.onRegister(registration);
\r
100 } catch (Exception e) {
\r
101 LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
\r
104 return registration;
\r
107 override final def registerDataChangeListener(P path, DCL listener) {
\r
108 val reg = new DataChangeListenerRegistration(path, listener, this);
\r
109 listeners.put(path, reg);
\r
110 val initialConfig = dataReadRouter.readConfigurationData(path);
\r
111 val initialOperational = dataReadRouter.readOperationalData(path);
\r
112 val event = createInitialListenerEvent(path,initialConfig,initialOperational);
\r
113 listener.onDataChanged(event);
\r
117 final def registerDataReader(P path, DataReader<P, D> reader) {
\r
119 val confReg = dataReadRouter.registerConfigurationReader(path, reader);
\r
120 val dataReg = dataReadRouter.registerOperationalReader(path, reader);
\r
122 return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
\r
125 override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
\r
126 val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
\r
131 protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
\r
132 return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
\r
136 protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
\r
137 listeners.remove(registration.path, registration);
\r
140 protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
\r
141 commitHandlers.remove(registration.path, registration);
\r
143 LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
\r
144 for(listener : commitHandlerRegistrationListeners) {
\r
146 listener.instance.onUnregister(registration);
\r
147 } catch (Exception e) {
\r
148 LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
\r
153 protected final def getActiveCommitHandlers() {
\r
154 return commitHandlers.entries;
\r
157 protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
\r
158 HashSet<P> paths) {
\r
159 return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
\r
160 val operationalState = readOperationalData(key)
\r
161 val configurationState = readConfigurationData(key)
\r
162 return new ListenerStateCapture(key, value, operationalState, configurationState)
\r
166 protected def boolean isAffectedBy(P key, Set<P> paths) {
\r
167 if (paths.contains(key)) {
\r
170 for (path : paths) {
\r
171 if (key.contains(path)) {
\r
179 package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
\r
180 checkNotNull(transaction);
\r
181 transaction.changeStatus(TransactionStatus.SUBMITED);
\r
182 val task = new TwoPhaseCommit(transaction, this);
\r
183 submittedTransactionsCount.andIncrement;
\r
184 return executor.submit(task);
\r
190 package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
\r
196 Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
\r
199 D initialOperationalState;
\r
202 D initialConfigurationState;
\r
205 package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
\r
207 AbstractDataBroker<P, D, DCL> dataBroker;
\r
212 new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
\r
214 dataBroker = broker;
\r
218 override protected removeRegistration() {
\r
219 dataBroker.removeListener(this);
\r
225 package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
\r
226 extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
\r
227 implements DataCommitHandlerRegistration<P, D> {
\r
229 AbstractDataBroker<P, D, ?> dataBroker;
\r
234 new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
\r
236 dataBroker = broker;
\r
240 override protected removeRegistration() {
\r
241 dataBroker.removeCommitHandler(this);
\r
246 package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
\r
248 private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
\r
250 val AbstractDataTransaction<P, D> transaction;
\r
251 val AbstractDataBroker<P, D, DCL> dataBroker;
\r
253 new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
\r
254 this.transaction = transaction;
\r
255 this.dataBroker = broker;
\r
258 override call() throws Exception {
\r
260 // get affected paths
\r
261 val affectedPaths = new HashSet<P>();
\r
263 affectedPaths.addAll(transaction.createdConfigurationData.keySet);
\r
264 affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
\r
265 affectedPaths.addAll(transaction.removedConfigurationData);
\r
267 affectedPaths.addAll(transaction.createdOperationalData.keySet);
\r
268 affectedPaths.addAll(transaction.updatedOperationalData.keySet);
\r
269 affectedPaths.addAll(transaction.removedOperationalData);
\r
271 val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
\r
273 val transactionId = transaction.identifier;
\r
275 log.trace("Transaction: {} Started.",transactionId);
\r
276 // requesting commits
\r
277 val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
\r
278 val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
\r
280 for (handler : commitHandlers) {
\r
281 handlerTransactions.add(handler.requestCommit(transaction));
\r
283 } catch (Exception e) {
\r
284 log.error("Transaction: {} Request Commit failed", transactionId,e);
\r
285 dataBroker.failedTransactionsCount.andIncrement
\r
286 return rollback(handlerTransactions, e);
\r
288 val List<RpcResult<Void>> results = new ArrayList();
\r
290 for (subtransaction : handlerTransactions) {
\r
291 results.add(subtransaction.finish());
\r
293 listeners.publishDataChangeEvent();
\r
294 } catch (Exception e) {
\r
295 log.error("Transaction: {} Finish Commit failed",transactionId, e);
\r
296 dataBroker.failedTransactionsCount.andIncrement
\r
297 return rollback(handlerTransactions, e);
\r
299 log.trace("Transaction: {} Finished successfully.",transactionId);
\r
300 dataBroker.finishedTransactionsCount.andIncrement;
\r
301 return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
\r
305 def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
\r
306 for (listenerSet : listeners) {
\r
307 val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
\r
308 val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
\r
310 val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
\r
311 listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
\r
312 for (listener : listenerSet.listeners) {
\r
314 listener.instance.onDataChanged(changeEvent);
\r
316 } catch (Exception e) {
\r
317 e.printStackTrace();
\r
323 def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
\r
324 for (transaction : transactions) {
\r
325 transaction.rollback()
\r
328 // FIXME return encountered error.
\r
329 return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
\r
333 public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
\r
335 private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
338 private val Object identifier;
\r
340 var TransactionStatus status;
\r
342 var AbstractDataBroker<P, D, ?> broker;
\r
344 protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
\r
346 _identifier = identifier;
\r
347 broker = dataBroker;
\r
348 status = TransactionStatus.NEW;
\r
349 LOG.debug("Transaction {} Allocated.", identifier);
351 //listeners = new ListenerRegistry<>();
\r
354 override commit() {
\r
355 return broker.commit(this);
\r
358 override readConfigurationData(P path) {
\r
359 val local = this.updatedConfigurationData.get(path);
\r
360 if(local != null) {
\r
364 return broker.readConfigurationData(path);
\r
367 override readOperationalData(P path) {
\r
368 val local = this.updatedOperationalData.get(path);
\r
369 if(local != null) {
\r
372 return broker.readOperationalData(path);
\r
375 override hashCode() {
\r
376 return identifier.hashCode;
\r
379 override equals(Object obj) {
\r
384 if (getClass() != obj.getClass())
\r
386 val other = (obj as AbstractDataTransaction<P,D>);
\r
387 if (broker == null) {
\r
388 if (other.broker != null)
\r
390 } else if (!broker.equals(other.broker))
\r
392 if (identifier == null) {
\r
393 if (other.identifier != null)
\r
395 } else if (!identifier.equals(other.identifier))
\r
400 override TransactionStatus getStatus() {
\r
404 protected abstract def void onStatusChange(TransactionStatus status);
\r
406 public def changeStatus(TransactionStatus status) {
\r
407 LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
408 this.status = status;
\r
409 onStatusChange(status);
\r