this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
- LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
+ LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
- LOG.debug("onReceiveRecover: Received message {} from {}",
- message.getClass().toString(),
- getSender());
+ LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(),
+ message.getClass().toString(), getSender());
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
- LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
+ LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender());
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
- LOG.warning("Current transaction {} has timed out after {} ms - aborting",
- cohortEntry.getTransactionID(), transactionCommitTimeout);
+ LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+ persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
private void handleCommitTransaction(final CommitTransaction commit) {
final String transactionID = commit.getTransactionID();
private void handleCommitTransaction(final CommitTransaction commit) {
final String transactionID = commit.getTransactionID();
// We're not the current Tx - the Tx was likely expired b/c it took too long in
// between the canCommit and commit messages.
IllegalStateException ex = new IllegalStateException(
// We're not the current Tx - the Tx was likely expired b/c it took too long in
// between the canCommit and commit messages.
IllegalStateException ex = new IllegalStateException(
- String.format("Cannot commit transaction %s - it is not the current transaction",
- transactionID));
+ String.format("%s: Cannot commit transaction %s - it is not the current transaction",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
LOG.error(ex.getMessage());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
Shard.this.persistData(getSender(), transactionID,
new ModificationPayload(cohortEntry.getModification()));
}
Shard.this.persistData(getSender(), transactionID,
new ModificationPayload(cohortEntry.getModification()));
}
- } catch (InterruptedException | ExecutionException | IOException e) {
- LOG.error(e, "An exception occurred while preCommitting transaction {}",
- cohortEntry.getTransactionID());
+ } catch (Exception e) {
+ LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
+ persistenceId(), cohortEntry.getTransactionID());
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
IllegalStateException ex = new IllegalStateException(
- String.format("Could not finish committing transaction %s - no CohortEntry found",
- transactionID));
+ String.format("%s: Could not finish committing transaction %s - no CohortEntry found",
+ persistenceId(), transactionID));
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
- LOG.error(e, "An exception occurred while committing transaction {}", transactionID);
+ LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
- ready.getTxnClientVersion());
+ LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
+ ready.getTransactionID(), ready.getTxnClientVersion());
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// to provide the compatible behavior.
ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
// to provide the compatible behavior.
ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
}
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
}
void doAbortTransaction(final String transactionID, final ActorRef sender) {
final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
void doAbortTransaction(final String transactionID, final ActorRef sender) {
final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
if(cohortEntry != null) {
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
// aborted during replication in which case we may still commit locally if replication
// We don't remove the cached cohort entry here (ie pass false) in case the Tx was
// aborted during replication in which case we may still commit locally if replication
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
- "Could not find shard leader so transaction cannot be created. This typically happens" +
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
if(isLeader()) {
registration = doChangeListenerRegistration(registerChangeListener);
} else {
ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
NormalizedNode<?, ?>>> registration;
if(isLeader()) {
registration = doChangeListenerRegistration(registerChangeListener);
} else {
ActorRef listenerRegistration = getContext().actorOf(
DataChangeListenerRegistration.props(registration));
ActorRef listenerRegistration = getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- listenerRegistration.path());
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ persistenceId(), listenerRegistration.path());
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
return store.registerChangeListener(registerChangeListener.getPath(), listener,
registerChangeListener.getScope());
return store.registerChangeListener(registerChangeListener.getPath(), listener,
registerChangeListener.getScope());
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
}
recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
}
recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
}
recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
}
recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
if(LOG.isDebugEnabled()) {
Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
if(LOG.isDebugEnabled()) {
Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
if(LOG.isDebugEnabled()) {
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- data, data.getClass().getClassLoader(),
+ LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
+ persistenceId(), data, data.getClass().getClassLoader(),
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
if(modification == null) {
LOG.error(
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
if(modification == null) {
LOG.error(
- "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor != null ? clientActor.path().toString() : null);
+ "{}: modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ persistenceId(), identifier, clientActor != null ? clientActor.path().toString() : null);
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
// we can safely commit everything in here. We not need to worry about event notifications
// as they would have already been disabled on the follower
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
if(LOG.isDebugEnabled()) {
LOG.debug(
- "onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- entry.getKey(), getId());
+ "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
+ persistenceId(), entry.getKey(), getId());