import com.google.common.collect.FluentIterable;
import java.util.Set
import com.google.common.collect.ImmutableList
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry
+import java.util.concurrent.atomic.AtomicLong
abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
DataReader<P, D>, //
DataChangePublisher<P, D, DCL>, //
DataProvisionService<P, D> {
+ private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
+
@Property
var ExecutorService executor;
var AbstractDataReadRouter<P, D> dataReadRouter;
Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
- Multimap<P, DataCommitHandlerRegistration<P, D>> commitHandlers = HashMultimap.create();
-
+ Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
+
+ val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
public new() {
}
protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
HashSet<P> paths) {
- return FluentIterable.from(commitHandlers.asMap.entrySet)
- .filter[key.isAffectedBy(paths)] //
- .transformAndConcat [value] //
- .transform[instance].toList()
+ return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
+ .transformAndConcat[value] //
+ .transform[instance].toList()
}
override final readConfigurationData(P path) {
}
override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
- val registration = new DataCommitHandlerRegistration(path, commitHandler, this);
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
commitHandlers.put(path, registration)
+ LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onRegister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
+ }
+ }
return registration;
}
return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
}
+
+ override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
+ val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
+
+ return ret;
+ }
+
protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
listeners.remove(registration.path, registration);
}
- protected final def removeCommitHandler(DataCommitHandlerRegistration<P, D> registration) {
+ protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
commitHandlers.remove(registration.path, registration);
+
+ LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
+ for(listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.instance.onUnregister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
+ }
+ }
}
protected final def getActiveCommitHandlers() {
}
@Data
-package class ListenerStateCapture<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> {
+package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
@Property
P path;
}
-package class DataCommitHandlerRegistration<P extends Path<P>, D> extends AbstractObjectRegistration<DataCommitHandler<P, D>> {
+package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
+extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
+implements DataCommitHandlerRegistration<P, D> {
AbstractDataBroker<P, D, ?> dataBroker;
dataBroker.removeCommitHandler(this);
dataBroker = null;
}
-
}
-package class TwoPhaseCommit<P extends Path<P>, D,DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
+package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
+ val transactionId = transaction.identifier;
+
+ log.info("Transaction: {} Started.",transactionId);
// requesting commits
- val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
+ val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
try {
for (handler : commitHandlers) {
handlerTransactions.add(handler.requestCommit(transaction));
}
} catch (Exception e) {
- log.error("Request Commit failded", e);
+ log.error("Transaction: {} Request Commit failed", transactionId,e);
return rollback(handlerTransactions, e);
}
val List<RpcResult<Void>> results = new ArrayList();
}
listeners.publishDataChangeEvent();
} catch (Exception e) {
- log.error("Finish Commit failed", e);
+ log.error("Transaction: {} Finish Commit failed",transactionId, e);
return rollback(handlerTransactions, e);
}
-
-
+ log.info("Transaction: {} Finished succesfully.",transactionId);
return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
}
-
- def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D,DCL>> listeners) {
- for(listenerSet : listeners) {
+
+ def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+ for (listenerSet : listeners) {
val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
- val changeEvent = new DataChangeEventImpl(transaction,listenerSet.initialConfigurationState,listenerSet.initialOperationalState,updatedOperational,updatedConfiguration);
- for(listener : listenerSet.listeners) {
+
+ val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
+ listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
+ for (listener : listenerSet.listeners) {
try {
listener.instance.onDataChanged(changeEvent);
-
+
} catch (Exception e) {
e.printStackTrace();
}
return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
}
}
+
public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
@Property
var AbstractDataBroker<P, D, ?> broker;
- protected new(AbstractDataBroker<P, D, ?> dataBroker) {
+ protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
super(dataBroker);
- _identifier = new Object();
+ _identifier = identifier;
broker = dataBroker;
status = TransactionStatus.NEW;