Fix potential issue with transaction timeouts 62/14662/4
authortpantelis <tpanteli@brocade.com>
Sat, 31 Jan 2015 04:52:15 +0000 (23:52 -0500)
committertpantelis <tpanteli@brocade.com>
Tue, 3 Feb 2015 05:11:47 +0000 (00:11 -0500)
If CanCommit is sent for a transaction but there is already a current
transaction being committed, the second transaction is queued and
processed when the current transaction completes. However when the
second transaction is de-queued and CanComitted, the last access time
is not updated. There is a periodic timer task that checks if the current
transaction has not been accessed after a relatively long period of time
(eg if CanCommit was sent but Commit isn't sent possibly due to network
failure). In this case the current transaction is aborted. So if a
transaction was queued for a long enough period of time, when it becomes
the current transaction, the timer task could run and abort it before
the front-end has a chance to send the Commit. To prevent this, the
de-queued transaction's access time is updated when it it made the
current transaction.

I also changed the commit() and preCommit() calls to the InMemoryDataStore
to catch Exception as the InMemoryDataStore can throw unchecked
exceptions. If uncaught, the exception is propagated to akka and no
response is sent.

Change-Id: I516bde86f45272f2d40fab76acbc27180e2ad402
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java

index 3fc9c142c5e8b40daec717b3292fe1d52afcbf94..744e2c22c69a9de8f16ff048648fbd087066eb41 100644 (file)
@@ -321,7 +321,7 @@ public class Shard extends RaftActor {
                 Shard.this.persistData(getSender(), transactionID,
                         new ModificationPayload(cohortEntry.getModification()));
             }
-        } catch (InterruptedException | ExecutionException | IOException e) {
+        } catch (Exception e) {
             LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
                     persistenceId(), cohortEntry.getTransactionID());
             shardMBean.incrementFailedTransactionsCount();
@@ -373,14 +373,14 @@ public class Shard extends RaftActor {
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
 
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (Exception e) {
             sender.tell(new akka.actor.Status.Failure(e), getSelf());
 
             LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
             shardMBean.incrementFailedTransactionsCount();
+        } finally {
+            commitCoordinator.currentTransactionComplete(transactionID, true);
         }
-
-        commitCoordinator.currentTransactionComplete(transactionID, true);
     }
 
     private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
index 659acb745473af389480ff08e19cc68857add93a..165e272d8b09ca1631a94e7a1e7d14a4370bb595 100644 (file)
@@ -205,6 +205,7 @@ public class ShardCommitCoordinator {
             // Dequeue the next cohort entry waiting in the queue.
             currentCohortEntry = queuedCohortEntries.poll();
             if(currentCohortEntry != null) {
+                currentCohortEntry.updateLastAccessTime();
                 doCanCommit(currentCohortEntry);
             }
         }