*/
package org.opendaylight.openflowplugin.impl.device;
+import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import java.math.BigInteger;
import java.util.Collection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
private final TranslatorLibrary translatorLibrary;
- private Map<Long, NodeConnectorRef> nodeConnectorCache;
+ private final Map<Long, NodeConnectorRef> nodeConnectorCache;
private ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
private RpcContext rpcContext;
private ExtensionConverterProvider extensionConverterProvider;
return dataBroker.newReadOnlyTransaction();
}
+ @Override
+ public void onClusterRoleChange(@CheckForNull final OfpRole role) {
+ this.onClusterRoleChange(role, true);
+ }
+
+ @Override
+ public void onInitClusterRoleChange(@CheckForNull final OfpRole role) {
+ this.onClusterRoleChange(role, false);
+ }
+
@Override
public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
final InstanceIdentifier<T> path, final T data) {
}
/**
- * @deprecated will be deleted
*/
@Override
- public void onDeviceDisconnectedFromCluster() {
+ public void onDeviceDisconnectedFromCluster(final boolean removeNodeFromDS) {
LOG.info("Removing device from operational and closing transaction Manager for device:{}", getDeviceState().getNodeId());
- transactionChainManager.cleanupPostClosure();
+ transactionChainManager.cleanupPostClosure(removeNodeFromDS);
+ if (removeNodeFromDS) {
+ // FIXME : it has to be hooked for some another part of code
+ final WriteTransaction write = dataBroker.newWriteOnlyTransaction();
+ write.delete(LogicalDatastoreType.OPERATIONAL, deviceState.getNodeInstanceIdentifier());
+ final CheckedFuture<Void, TransactionCommitFailedException> deleteFuture = write.submit();
+ Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
+
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Delete Node {} was successful", deviceState.getNodeId());
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
+ }
+ });
+ }
}
@Override
public ExtensionConverterProvider getExtensionConverterProvider() {
return extensionConverterProvider;
}
+
+ private void onClusterRoleChange(@CheckForNull final OfpRole role, final boolean enableSubmit) {
+ LOG.debug("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
+ Preconditions.checkArgument(role != null);
+ if (OfpRole.BECOMESLAVE.equals(role)) {
+ transactionChainManager.activateTransactionManager(enableSubmit);
+ } else if (OfpRole.BECOMEMASTER.equals(role)) {
+ transactionChainManager.deactivateTransactionManager();
+ } else {
+ LOG.warn("Unknow OFCluster Role {} for Node {}", role, deviceState.getNodeId());
+ }
+ }
}
package org.opendaylight.openflowplugin.impl.device;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
private final Object txLock = new Object();
private final DataBroker dataBroker;
+// private final DeviceState deviceState;
+ @GuardedBy("txLock")
private WriteTransaction wTx;
+ @GuardedBy("txLock")
private BindingTransactionChain txChainFactory;
private boolean submitIsEnabled;
return transactionChainManagerStatus;
}
- private volatile TransactionChainManagerStatus transactionChainManagerStatus;
+ @GuardedBy("txLock")
+ private TransactionChainManagerStatus transactionChainManagerStatus;
private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
private volatile Registration managerRegistration;
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.nodeII = Preconditions.checkNotNull(nodeII);
this.managerRegistration = Preconditions.checkNotNull(managerRegistration);
- this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
- createTxChain(dataBroker);
+ this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
+ createTxChain();
LOG.debug("created txChainManager");
}
- private void createTxChain(final DataBroker dataBroker) {
+ @GuardedBy("txLock")
+ private void createTxChain() {
+ if (txChainFactory != null) {
+ txChainFactory.close();
+ }
txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
}
submitWriteTransaction();
}
+ /**
+ * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
+ * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
+ * transactions. Call this method for MASTER role only.
+ * @param enableSubmit - marker to be sure a WriteTransaction submit is not blocking
+ * (Blocking write is used for initialization part only)
+ */
+ public void activateTransactionManager(final boolean enableSubmit) {
+// LOG.trace("activetTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), enableSubmit);
+ synchronized (txLock) {
+ if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
+// LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
+ Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
+ Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
+ this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
+ createTxChain();
+ this.submitIsEnabled = enableSubmit;
+ } else {
+// LOG.debug("Transaction is active {}", deviceState.getNodeId());
+ }
+ }
+ }
+
+ /**
+ * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
+ * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
+ * Call this method for SLAVE only.
+ */
+ public void deactivateTransactionManager() {
+ synchronized (txLock) {
+ if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
+// LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
+ submitWriteTransaction();
+ Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
+// LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId());
+ transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
+ txChainFactory.close();
+ txChainFactory = null;
+ }
+ }
+ }
boolean submitWriteTransaction() {
if (!submitIsEnabled) {
return false;
}
synchronized (txLock) {
+ Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
+ "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
if (wTx == null) {
LOG.trace("nothing to commit - submit returns true");
return true;
return true;
}
+ @Deprecated
public void cancelWriteTransaction() {
// there is no cancel txn in ping-pong broker. So we need to drop the chain and recreate it.
// since the chain is created per device, there won't be any other txns other than ones we created.
<T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
final InstanceIdentifier<T> path) {
final WriteTransaction writeTx = getTransactionSafely();
- writeTx.delete(store, path);
+ if (writeTx != null) {
+ writeTx.delete(store, path);
+ } else {
+ LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
+ }
}
<T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
final InstanceIdentifier<T> path, final T data) {
final WriteTransaction writeTx = getTransactionSafely();
- writeTx.put(store, path, data);
+ if (writeTx != null) {
+ writeTx.put(store, path, data);
+ } else {
+ LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
+ }
}
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
final AsyncTransaction<?, ?> transaction, final Throwable cause) {
LOG.warn("txChain failed -> recreating", cause);
- recreateTxChain();
+ if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
+ recreateTxChain();
+ }
}
@Override
public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
- // NOOP - only yet, here is probably place for notification to get new WriteTransaction
+ // NOOP
}
private void recreateTxChain() {
- txChainFactory.close();
- createTxChain(dataBroker);
synchronized (txLock) {
+ createTxChain();
wTx = null;
}
}
-
+ @Nullable
private WriteTransaction getTransactionSafely() {
if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
synchronized (txLock) {
- if (wTx == null) {
+ if (wTx == null && txChainFactory != null) {
wTx = txChainFactory.newWriteOnlyTransaction();
}
}
}
/**
- * When a device disconnects from a node of the cluster, the device context gets closed. With that the txChainMgr
- * status is set to SHUTTING_DOWN and is closed.
- * When the EntityOwnershipService notifies and is derived that this was indeed the last node from which the device
- * had disconnected, then we clean the inventory.
- * Called from DeviceContext
+ * @deprecated will be removed
+ * @param removeDSNode
*/
- public void cleanupPostClosure() {
- LOG.debug("Removing node {} from operational DS.", nodeII);
+ @Deprecated
+ public void cleanupPostClosure(final boolean removeDSNode) {
synchronized (txLock) {
- final WriteTransaction writeTx;
-
- //TODO(Kamal): Fix this. This might cause two txChain Manager working on the same node.
- if (txChainFactory == null) {
- LOG.info("Creating new Txn Chain Factory for cleanup purposes - Race Condition Hazard, " +
- "Concurrent Modification Hazard, node:{}", nodeII);
- createTxChain(dataBroker);
- }
+ if (removeDSNode) {
+ LOG.info("Removing from operational DS, node {} ", nodeII);
+ final WriteTransaction writeTx = getTransactionSafely();
+ this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
+ writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
+ LOG.debug("Delete from operational DS put to write transaction. node {} ", nodeII);
+ final CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
+ LOG.info("Delete from operational DS write transaction submitted. node {} ", nodeII);
+ Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void aVoid) {
+ LOG.debug("Removing from operational DS successful . node {} ", nodeII);
+ notifyReadyForNewTransactionChainAndCloseFactory();
+ }
- if (TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
- // status is already shutdown. so get the tx directly
- writeTx = txChainFactory.newWriteOnlyTransaction();
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.info("Attempt to close transaction chain factory failed.", throwable);
+ notifyReadyForNewTransactionChainAndCloseFactory();
+ }
+ });
+ wTx = null;
} else {
- writeTx = getTransactionSafely();
- }
-
- this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
- writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
- LOG.debug("Delete node {} from operational DS put to write transaction.", nodeII);
-
- CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
- LOG.debug("Delete node {} from operational DS write transaction submitted.", nodeII);
-
- Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void aVoid) {
- LOG.debug("Removing node {} from operational DS successful .", nodeII);
+ if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WAITING_TO_BE_SHUT)) {
+ LOG.info("This is a disconnect, but not the last node,transactionChainManagerStatus={}, node:{}",
+ transactionChainManagerStatus, nodeII);
+ // a disconnect has happened, but this is not the last node in the cluster, so just close the chain
+ this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
notifyReadyForNewTransactionChainAndCloseFactory();
+ wTx = null;
+ } else {
+ LOG.trace("This is not a disconnect, hence we are not closing txnChainMgr,transactionChainManagerStatus={}, node:{}",
+ transactionChainManagerStatus, nodeII);
}
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.info("Attempt to close transaction chain factory failed.", throwable);
- notifyReadyForNewTransactionChainAndCloseFactory();
- }
- });
- wTx = null;
+ }
}
}
+ /**
+ * @deprecated will be removed
+ */
+ @Deprecated
private void notifyReadyForNewTransactionChainAndCloseFactory() {
- if(managerRegistration == null){
- LOG.warn("managerRegistration is null");
- return;
- }
synchronized (this) {
try {
+ LOG.info("Closing registration in manager.node:{} ", nodeII);
if (managerRegistration != null) {
- LOG.debug("Closing registration in manager.");
managerRegistration.close();
}
- } catch (Exception e) {
- LOG.warn("Failed to close transaction chain manager's registration.", e);
+ } catch (final Exception e) {
+ LOG.warn("Failed to close transaction chain manager's registration..node:{} ", nodeII, e);
}
managerRegistration = null;
}
txChainFactory.close();
- txChainFactory = null;
- LOG.debug("Transaction chain factory closed.");
+ LOG.info("Transaction chain factory closed. node:{} ", nodeII);
}
@Override
public void close() {
- LOG.debug("closing txChainManager without cleanup of node {} from operational DS.", nodeII);
+ LOG.info("Setting transactionChainManagerStatus to WAITING_TO_BE_SHUT, will wait for ownershipservice to notify", nodeII);
+ // we can finish in initial phase
+ initialSubmitWriteTransaction();
synchronized (txLock) {
- this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
- notifyReadyForNewTransactionChainAndCloseFactory();
- wTx = null;
+ if (txChainFactory != null) {
+ txChainFactory.close();
+ txChainFactory = null;
+ }
+ this.transactionChainManagerStatus = TransactionChainManagerStatus.WAITING_TO_BE_SHUT;
}
+ Preconditions.checkState(wTx == null);
+ Preconditions.checkState(txChainFactory == null);
}
public enum TransactionChainManagerStatus {
- WORKING, SHUTTING_DOWN;
+ WORKING, SLEEPING, WAITING_TO_BE_SHUT, SHUTTING_DOWN;
}
+
+
}