Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Merge "Add Distributed DataStore as a dependency of the Simulator"
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
ShardCommitCoordinator.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
index 19fa26682e2a4cea7b637fda85064a3aea0226e5..165e272d8b09ca1631a94e7a1e7d14a4370bb595 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
@@
-9,6
+9,7
@@
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.ActorRef;
import akka.actor.Status;
+import akka.event.LoggingAdapter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
@@
-19,8
+20,6
@@
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@
-29,8
+28,6
@@
import org.slf4j.LoggerFactory;
*/
public class ShardCommitCoordinator {
*/
public class ShardCommitCoordinator {
- private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
-
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
@@
-39,11
+36,18
@@
public class ShardCommitCoordinator {
private final int queueCapacity;
private final int queueCapacity;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+ private final LoggingAdapter log;
+
+ private final String name;
+
+ public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+ String name) {
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
+ this.log = log;
+ this.name = name;
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
@@
-74,9
+78,9
@@
public class ShardCommitCoordinator {
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
- if(
LOG
.isDebugEnabled()) {
-
LOG.debug("
Processing canCommit for transaction {} for shard {}",
- transactionID, shard.path());
+ if(
log
.isDebugEnabled()) {
+
log.debug("{}:
Processing canCommit for transaction {} for shard {}",
+
name,
transactionID, shard.path());
}
// Lookup the cohort entry that was cached previously (or should have been) by
}
// Lookup the cohort entry that was cached previously (or should have been) by
@@
-86,8
+90,8
@@
public class ShardCommitCoordinator {
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("
No cohort entry found for transaction %s"
, transactionID));
-
LOG
.error(ex.getMessage());
+ String.format("
%s: No cohort entry found for transaction %s", name
, transactionID));
+
log
.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
return;
}
sender.tell(new Status.Failure(ex), shard);
return;
}
@@
-98,8
+102,8
@@
public class ShardCommitCoordinator {
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
-
LOG.debug("
Transaction {} is already in progress - queueing transaction {}",
- currentCohortEntry.getTransactionID(), transactionID);
+
log.debug("{}:
Transaction {} is already in progress - queueing transaction {}",
+
name,
currentCohortEntry.getTransactionID(), transactionID);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
@@
-107,10
+111,10
@@
public class ShardCommitCoordinator {
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
- String.format("Could not enqueue transaction %s - the maximum commit queue"+
+ String.format("
%s:
Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
" capacity %d has been reached.",
- transactionID, queueCapacity));
-
LOG
.error(ex.getMessage());
+
name,
transactionID, queueCapacity));
+
log
.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
}
} else {
sender.tell(new Status.Failure(ex), shard);
}
} else {
@@
-140,7
+144,7
@@
public class ShardCommitCoordinator {
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
-
LOG.debug("An exception occurred during canCommit"
, e);
+
log.debug("{}: An exception occurred during canCommit: {}", name
, e);
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
@@
-201,6
+205,7
@@
public class ShardCommitCoordinator {
// Dequeue the next cohort entry waiting in the queue.
currentCohortEntry = queuedCohortEntries.poll();
if(currentCohortEntry != null) {
// Dequeue the next cohort entry waiting in the queue.
currentCohortEntry = queuedCohortEntries.poll();
if(currentCohortEntry != null) {
+ currentCohortEntry.updateLastAccessTime();
doCanCommit(currentCohortEntry);
}
}
doCanCommit(currentCohortEntry);
}
}