Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Merge "Add Distributed DataStore as a dependency of the Simulator"
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
ShardCommitCoordinator.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
index f3b4e416403b0594a22da9ff47de2f68a1d284cc..165e272d8b09ca1631a94e7a1e7d14a4370bb595 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
@@
-7,6
+7,11
@@
*/
package org.opendaylight.controller.cluster.datastore;
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.event.LoggingAdapter;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
@@
-15,12
+20,6
@@
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@
-29,14
+28,6
@@
import com.google.common.cache.CacheBuilder;
*/
public class ShardCommitCoordinator {
*/
public class ShardCommitCoordinator {
- private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
-
- private static final Object CAN_COMMIT_REPLY_TRUE =
- new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
-
- private static final Object CAN_COMMIT_REPLY_FALSE =
- new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
-
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
private final Cache<String, CohortEntry> cohortCache;
private CohortEntry currentCohortEntry;
@@
-45,11
+36,18
@@
public class ShardCommitCoordinator {
private final int queueCapacity;
private final int queueCapacity;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
+ private final LoggingAdapter log;
+
+ private final String name;
+
+ public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+ String name) {
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
this.queueCapacity = queueCapacity;
+ this.log = log;
+ this.name = name;
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
// We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
// since this should only be accessed on the shard's dispatcher.
@@
-80,9
+78,9
@@
public class ShardCommitCoordinator {
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
final ActorRef shard) {
String transactionID = canCommit.getTransactionID();
- if(
LOG
.isDebugEnabled()) {
-
LOG.debug("
Processing canCommit for transaction {} for shard {}",
- transactionID, shard.path());
+ if(
log
.isDebugEnabled()) {
+
log.debug("{}:
Processing canCommit for transaction {} for shard {}",
+
name,
transactionID, shard.path());
}
// Lookup the cohort entry that was cached previously (or should have been) by
}
// Lookup the cohort entry that was cached previously (or should have been) by
@@
-92,8
+90,8
@@
public class ShardCommitCoordinator {
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
// Either canCommit was invoked before ready(shouldn't happen) or a long time passed
// between canCommit and ready and the entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("
No cohort entry found for transaction %s"
, transactionID));
-
LOG
.error(ex.getMessage());
+ String.format("
%s: No cohort entry found for transaction %s", name
, transactionID));
+
log
.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
return;
}
sender.tell(new Status.Failure(ex), shard);
return;
}
@@
-104,8
+102,8
@@
public class ShardCommitCoordinator {
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
if(currentCohortEntry != null) {
// There's already a Tx commit in progress - attempt to queue this entry to be
// committed after the current Tx completes.
-
LOG.debug("
Transaction {} is already in progress - queueing transaction {}",
- currentCohortEntry.getTransactionID(), transactionID);
+
log.debug("{}:
Transaction {} is already in progress - queueing transaction {}",
+
name,
currentCohortEntry.getTransactionID(), transactionID);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
if(queuedCohortEntries.size() < queueCapacity) {
queuedCohortEntries.offer(cohortEntry);
@@
-113,10
+111,10
@@
public class ShardCommitCoordinator {
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
removeCohortEntry(transactionID);
RuntimeException ex = new RuntimeException(
- String.format("Could not enqueue transaction %s - the maximum commit queue"+
+ String.format("
%s:
Could not enqueue transaction %s - the maximum commit queue"+
" capacity %d has been reached.",
" capacity %d has been reached.",
- transactionID, queueCapacity));
-
LOG
.error(ex.getMessage());
+
name,
transactionID, queueCapacity));
+
log
.error(ex.getMessage());
sender.tell(new Status.Failure(ex), shard);
}
} else {
sender.tell(new Status.Failure(ex), shard);
}
} else {
@@
-138,14
+136,15
@@
public class ShardCommitCoordinator {
Boolean canCommit = cohortEntry.getCohort().canCommit().get();
cohortEntry.getCanCommitSender().tell(
Boolean canCommit = cohortEntry.getCohort().canCommit().get();
cohortEntry.getCanCommitSender().tell(
- canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
+ canCommit ? CanCommitTransactionReply.YES.toSerializable() :
+ CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
if(!canCommit) {
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
if(!canCommit) {
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
}
} catch (InterruptedException | ExecutionException e) {
-
LOG.debug("An exception occurred during canCommit"
, e);
+
log.debug("{}: An exception occurred during canCommit: {}", name
, e);
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
// Remove the entry from the cache now since the Tx will be aborted.
removeCohortEntry(cohortEntry.getTransactionID());
@@
-206,6
+205,7
@@
public class ShardCommitCoordinator {
// Dequeue the next cohort entry waiting in the queue.
currentCohortEntry = queuedCohortEntries.poll();
if(currentCohortEntry != null) {
// Dequeue the next cohort entry waiting in the queue.
currentCohortEntry = queuedCohortEntries.poll();
if(currentCohortEntry != null) {
+ currentCohortEntry.updateLastAccessTime();
doCanCommit(currentCohortEntry);
}
}
doCanCommit(currentCohortEntry);
}
}