import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
@GuardedBy("txLock")
private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
+ private ReadWriteLock readWriteTransactionLock = new ReentrantReadWriteLock();
public TransactionChainManager(@NonNull final DataBroker dataBroker,
@NonNull final String deviceIdentifier) {
*/
SHUTTING_DOWN
}
+
+ public void acquireWriteTransactionLock() {
+ readWriteTransactionLock.writeLock().lock();
+ }
+
+ public void releaseWriteTransactionLock() {
+ readWriteTransactionLock.writeLock().unlock();
+ }
+
}
String datapathId = deviceInfo.getDatapathId().toString().intern();
queuedNotificationManager.submitNotification(datapathId, () -> {
try {
+ acquireWriteTransactionLock();
final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
.translate(portStatusMessage, getDeviceInfo(), null);
OF_EVENT_LOG.debug("Node Connector Status, Node: {}, PortNumber: {}, PortName: {}, Reason: {}",
} catch (final Exception e) {
LOG.warn("Error processing port status message for port {} on device {}",
portStatusMessage.getPortNo(), datapathId, e);
+ } finally {
+ releaseWriteTransactionLock();
}
});
}
return deviceInfo.getServiceIdentifier();
}
+ @Override
+ public void acquireWriteTransactionLock() {
+ transactionChainManager.acquireWriteTransactionLock();
+ }
+
+ @Override
+ public void releaseWriteTransactionLock() {
+ transactionChainManager.releaseWriteTransactionLock();
+ }
+
@Override
public void close() {
// Close all datastore registries and transactions
}), MoreExecutors.directExecutor());
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private static boolean processStatistics(final MultipartType type, final List<? extends DataContainer> statistics,
final TxFacade txFacade, final DeviceRegistry deviceRegistry,
final DeviceInfo deviceInfo,
final MultipartWriterProvider statisticsWriterProvider) {
final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo.getNodeInstanceIdentifier()
.augmentation(FlowCapableNode.class);
+ try {
+ txFacade.acquireWriteTransactionLock();
+ switch (type) {
+ case OFPMPFLOW:
+ deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
+ deviceRegistry.getDeviceFlowRegistry().processMarks();
+ break;
+ case OFPMPMETERCONFIG:
+ deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
+ deviceRegistry.getDeviceMeterRegistry().processMarks();
+ break;
+ case OFPMPGROUPDESC:
+ deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
+ deviceRegistry.getDeviceGroupRegistry().processMarks();
+ break;
+ default:
+ // no operation
+ }
- switch (type) {
- case OFPMPFLOW:
- deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
- deviceRegistry.getDeviceFlowRegistry().processMarks();
- break;
- case OFPMPMETERCONFIG:
- deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
- deviceRegistry.getDeviceMeterRegistry().processMarks();
- break;
- case OFPMPGROUPDESC:
- deleteAllKnownGroups(txFacade, instanceIdentifier, deviceRegistry.getDeviceGroupRegistry());
- deviceRegistry.getDeviceGroupRegistry().processMarks();
- break;
- default:
- // no operation
- }
-
- if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
- txFacade.submitTransaction();
+ if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
+ txFacade.submitTransaction();
- LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
- return true;
+ LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
+ return true;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while writing statistics to operational inventory for the device {}",
+ deviceInfo.getLOGValue(), e);
+ } finally {
+ txFacade.releaseWriteTransactionLock();
}
LOG.warn("Stats processing of type {} for node {} " + "failed during write-to-tx step", type, deviceInfo);