2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.device;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import javax.annotation.Nonnull;
17 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
27 import org.opendaylight.yangtools.concepts.Registration;
28 import org.opendaylight.yangtools.yang.binding.DataObject;
29 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
30 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
36 * org.opendaylight.openflowplugin.impl.device
38 * Package protected class for controlling {@link WriteTransaction} life cycle. It is
39 * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
40 * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
41 * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
43 * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
45 * Created: Apr 2, 2015
47 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
49 private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
51 private final Object txLock = new Object();
53 private final DataBroker dataBroker;
54 private WriteTransaction wTx;
55 private BindingTransactionChain txChainFactory;
56 private boolean submitIsEnabled;
58 public TransactionChainManagerStatus getTransactionChainManagerStatus() {
59 return transactionChainManagerStatus;
62 private TransactionChainManagerStatus transactionChainManagerStatus;
63 private ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler;
64 private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
65 private volatile Registration managerRegistration;
67 TransactionChainManager(@Nonnull final DataBroker dataBroker,
68 @Nonnull final KeyedInstanceIdentifier<Node, NodeKey> nodeII,
69 @Nonnull final Registration managerRegistration) {
70 this.dataBroker = Preconditions.checkNotNull(dataBroker);
71 this.nodeII = Preconditions.checkNotNull(nodeII);
72 this.managerRegistration = Preconditions.checkNotNull(managerRegistration);
73 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
74 createTxChain(dataBroker);
75 LOG.debug("created txChainManager");
78 private void createTxChain(final DataBroker dataBroker) {
79 txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
82 void initialSubmitWriteTransaction() {
84 submitWriteTransaction();
87 public synchronized boolean attemptToRegisterHandler(final ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler) {
88 if (TransactionChainManagerStatus.SHUTTING_DOWN.equals(this.transactionChainManagerStatus)
89 && null == this.readyForNewTransactionChainHandler) {
90 this.readyForNewTransactionChainHandler = readyForNewTransactionChainHandler;
91 if (managerRegistration == null) {
92 this.readyForNewTransactionChainHandler.onReadyForNewTransactionChain();
100 boolean submitWriteTransaction() {
101 if (!submitIsEnabled) {
102 LOG.trace("transaction not committed - submit block issued");
105 synchronized (txLock) {
107 LOG.trace("nothing to commit - submit returns true");
110 final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
111 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
113 public void onSuccess(Void result) {
118 public void onFailure(Throwable t) {
119 if (t instanceof TransactionCommitFailedException) {
120 LOG.error("Transaction commit failed. {}", t);
122 LOG.error("Exception during transaction submitting. {}", t);
131 public void cancelWriteTransaction() {
132 // there is no cancel txn in ping-pong broker. So we need to drop the chain and recreate it.
133 // since the chain is created per device, there won't be any other txns other than ones we created.
137 <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
138 final InstanceIdentifier<T> path) {
139 final WriteTransaction writeTx = getTransactionSafely();
140 writeTx.delete(store, path);
143 <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
144 final InstanceIdentifier<T> path, final T data) {
145 final WriteTransaction writeTx = getTransactionSafely();
146 writeTx.put(store, path, data);
150 public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
151 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
152 LOG.warn("txChain failed -> recreating", cause);
157 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
158 // NOOP - only yet, here is probably place for notification to get new WriteTransaction
161 private void recreateTxChain() {
162 txChainFactory.close();
163 createTxChain(dataBroker);
164 synchronized (txLock) {
170 private WriteTransaction getTransactionSafely() {
171 if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
172 synchronized (txLock) {
174 wTx = txChainFactory.newWriteOnlyTransaction();
182 void enableSubmit() {
183 submitIsEnabled = true;
187 * When a device disconnects from a node of the cluster, the device context gets closed. With that the txChainMgr
188 * status is set to SHUTTING_DOWN and is closed.
189 * When the EntityOwnershipService notifies and is derived that this was indeed the last node from which the device
190 * had disconnected, then we clean the inventory.
191 * Called from DeviceContext
193 public void cleanupPostClosure() {
194 LOG.debug("Removing node {} from operational DS.", nodeII);
195 synchronized (txLock) {
196 final WriteTransaction writeTx;
198 //TODO(Kamal): Fix this. This might cause two txChain Manager working on the same node.
199 if (txChainFactory == null) {
200 LOG.info("Creating new Txn Chain Factory for cleanup purposes - Race Condition Hazard, " +
201 "Concurrent Modification Hazard, node:{}", nodeII);
202 createTxChain(dataBroker);
205 if (TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
206 // status is already shutdown. so get the tx directly
207 writeTx = txChainFactory.newWriteOnlyTransaction();
209 writeTx = getTransactionSafely();
212 this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
213 writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
214 LOG.debug("Delete node {} from operational DS put to write transaction.", nodeII);
216 CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
217 LOG.debug("Delete node {} from operational DS write transaction submitted.", nodeII);
219 Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
221 public void onSuccess(final Void aVoid) {
222 LOG.debug("Removing node {} from operational DS successful .", nodeII);
223 notifyReadyForNewTransactionChainAndCloseFactory();
227 public void onFailure(final Throwable throwable) {
228 LOG.info("Attempt to close transaction chain factory failed.", throwable);
229 notifyReadyForNewTransactionChainAndCloseFactory();
236 private void notifyReadyForNewTransactionChainAndCloseFactory() {
237 if(managerRegistration == null){
238 LOG.warn("managerRegistration is null");
241 synchronized (this) {
243 if (managerRegistration != null) {
244 LOG.debug("Closing registration in manager.");
245 managerRegistration.close();
247 } catch (Exception e) {
248 LOG.warn("Failed to close transaction chain manager's registration.", e);
250 managerRegistration = null;
251 if (null != readyForNewTransactionChainHandler) {
252 readyForNewTransactionChainHandler.onReadyForNewTransactionChain();
255 txChainFactory.close();
256 txChainFactory = null;
257 LOG.debug("Transaction chain factory closed.");
261 public void close() {
262 LOG.debug("closing txChainManager without cleanup of node {} from operational DS.", nodeII);
263 synchronized (txLock) {
264 this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
265 notifyReadyForNewTransactionChainAndCloseFactory();
270 public enum TransactionChainManagerStatus {
271 WORKING, SHUTTING_DOWN;