/** * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.md.sal.common.impl.service; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; import org.opendaylight.yangtools.concepts.Path; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; public class TwoPhaseCommit
, D extends Object, DCL extends DataChangeListener
> implements
Callable transaction;
private final AbstractDataBroker dataBroker;
public TwoPhaseCommit(final AbstractDataTransaction transaction, final AbstractDataBroker broker) {
this.transaction = transaction;
this.dataBroker = broker;
}
@Override
public RpcResult changedPaths = ImmutableSet. builder().addAll(transaction.getUpdatedConfigurationData().keySet())
.addAll(transaction.getCreatedConfigurationData().keySet())
.addAll(transaction.getRemovedConfigurationData())
.addAll(transaction.getUpdatedOperationalData().keySet())
.addAll(transaction.getCreatedOperationalData().keySet())
.addAll(transaction.getRemovedOperationalData()).build();
log.trace("Transaction: {} Affected Subtrees: {}", transactionId, changedPaths);
// The transaction has no effects, let's just shortcut it
if (changedPaths.isEmpty()) {
dataBroker.getFinishedTransactionsCount().getAndIncrement();
transaction.succeeded();
log.trace("Transaction: {} Finished successfully (no effects).", transactionId);
return RpcResultBuilder. handler : commitHandlers) {
DataCommitTransaction requestCommit = handler.requestCommit(this.transaction);
if (requestCommit != null) {
handlerTransactions.add(requestCommit);
} else {
log.debug("Transaction: {}, Handler {} is not participating in transaction.", transactionId,
handler);
}
}
} catch (Exception e) {
log.error("Transaction: {} Request Commit failed", transactionId, e);
dataBroker.getFailedTransactionsCount().getAndIncrement();
this.transaction.failed();
return this.rollback(handlerTransactions, e);
}
log.trace("Transaction: {} Starting Finish.",transactionId);
final List subtransaction : handlerTransactions) {
results.add(subtransaction.finish());
}
} catch (Exception e) {
log.error("Transaction: {} Finish Commit failed", transactionId, e);
dataBroker.getFailedTransactionsCount().getAndIncrement();
transaction.failed();
return this.rollback(handlerTransactions, e);
}
dataBroker.getFinishedTransactionsCount().getAndIncrement();
transaction.succeeded();
log.trace("Transaction: {} Finished successfully.", transactionId);
captureFinalState(listeners);
log.trace("Transaction: {} Notifying listeners.", transactionId);
publishDataChangeEvent(listeners);
return RpcResultBuilder. state : listeners) {
state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath()));
state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath()));
}
}
private void captureFinalState(ImmutableList state : listeners) {
state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath()));
state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath()));
}
}
private void filterProbablyAffectedListeners(
ImmutableList listenerSet : probablyAffectedListeners) {
P affectedPath = listenerSet.getPath();
Optional originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath);
Map createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath);
Map updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath);
Set removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath));
return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper);
}
private Optional originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath);
Map createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath);
Map updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath);
Set removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath));
return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig);
}
private Optional originalConfig, Map createdConfig, Map updatedConfig,Set potentialDeletions) {
Predicate isContained = dataBroker.createIsContainedPredicate(affectedPath);
if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) {
return Optional.absent();
}
RootedChangeSet changeSet = new RootedChangeSet (affectedPath,originalConfig);
changeSet.addCreated(createdConfig);
for(Entry entry : updatedConfig.entrySet()) {
if(originalConfig.containsKey(entry.getKey())) {
changeSet.addUpdated(entry);
} else {
changeSet.addCreated(entry);
}
}
for(Entry entry : originalConfig.entrySet()) {
for(P deletion : potentialDeletions) {
if(isContained.apply(deletion)) {
changeSet.addRemoval(entry.getKey());
}
}
}
if(changeSet.isChange()) {
return Optional.of(changeSet);
} else {
return Optional.absent();
}
}
public void publishDataChangeEvent(final ImmutableList listenerSet : listeners) {
DataChangeEvent changeEvent = listenerSet.createEvent(transaction);
for (final DataChangeListenerRegistration listener : listenerSet.getListeners()) {
try {
listener.getInstance().onDataChanged(changeEvent);
} catch (Exception e) {
log.error("Unhandled exception when invoking listener {}", listener, e);
}
}
}
}
};
executor.submit(notifyTask);
}
public RpcResult transaction : transactions) {
transaction.rollback();
}
return RpcResultBuilder.