+
+ LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+
+ try {
+ // We block on the future here so we don't have to worry about possibly accessing our
+ // state on a different thread outside of our dispatcher. Also, the data store
+ // currently uses a same thread executor anyway.
+ cohortEntry.getCohort().commit().get();
+
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ } catch (Exception e) {
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+ LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
+ shardMBean.incrementFailedTransactionsCount();
+ } finally {
+ commitCoordinator.currentTransactionComplete(transactionID, true);
+ }
+ }
+
+ private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
+ LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
+ commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+ }
+
+ private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
+ LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
+ ready.getTransactionID(), ready.getTxnClientVersion());
+
+ // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
+ // commitCoordinator in preparation for the subsequent three phase commit initiated by
+ // the front-end.
+ commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
+ ready.getModification());
+
+ // Return our actor path as we'll handle the three phase commit, except if the Tx client
+ // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
+ // node. In that case, the subsequent 3-phase commit messages won't contain the
+ // transactionId so to maintain backwards compatibility, we create a separate cohort actor
+ // to provide the compatible behavior.
+ ActorRef replyActorPath = self();
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+ replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
+
+ ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
+ Serialization.serializedActorPath(replyActorPath));
+ getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, getSelf());
+ }
+
+ private void handleAbortTransaction(final AbortTransaction abort) {
+ doAbortTransaction(abort.getTransactionID(), getSender());
+ }
+
+ void doAbortTransaction(final String transactionID, final ActorRef sender) {
+ final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
+ if(cohortEntry != null) {
+ LOG.debug("{}: Aborting transaction {}", persistenceId(), transactionID);
+
+ // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
+ // aborted during replication in which case we may still commit locally if replication
+ // succeeds.
+ commitCoordinator.currentTransactionComplete(transactionID, false);
+
+ final ListenableFuture<Void> future = cohortEntry.getCohort().abort();
+ final ActorRef self = getSelf();
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void v) {
+ shardMBean.incrementAbortTransactionsCount();
+
+ if(sender != null) {
+ sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error(t, "{}: An exception happened during abort", persistenceId());
+
+ if(sender != null) {
+ sender.tell(new akka.actor.Status.Failure(t), self);
+ }
+ }
+ });
+ }
+ }
+
+ private void handleCreateTransaction(final Object message) {
+ if (isLeader()) {
+ createTransaction(CreateTransaction.fromSerializable(message));
+ } else if (getLeader() != null) {
+ getLeader().forward(message, getContext());
+ } else {
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find leader for shard %s so transaction cannot be created. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+ }
+
+ private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
+ DOMStoreTransactionChain chain =
+ transactionChains.remove(closeTransactionChain.getTransactionChainId());
+
+ if(chain != null) {
+ chain.close();
+ }
+ }
+
+ private ActorRef createTypedTransactionActor(int transactionType,
+ ShardTransactionIdentifier transactionId, String transactionChainId,
+ short clientVersion ) {
+
+ DOMStoreTransactionFactory factory = store;
+
+ if(!transactionChainId.isEmpty()) {
+ factory = transactionChains.get(transactionChainId);
+ if(factory == null){
+ DOMStoreTransactionChain transactionChain = store.createTransactionChain();
+ transactionChains.put(transactionChainId, transactionChain);
+ factory = transactionChain;
+ }
+ }
+
+ if(this.schemaContext == null) {
+ throw new IllegalStateException("SchemaContext is not set");
+ }
+
+ if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+
+ shardMBean.incrementReadOnlyTransactionCount();
+
+ return getContext().actorOf(
+ ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
+ schemaContext,datastoreContext, shardMBean,
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
+
+ } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+
+ shardMBean.incrementReadWriteTransactionCount();
+
+ return getContext().actorOf(
+ ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
+ schemaContext, datastoreContext, shardMBean,
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
+
+
+ } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+
+ shardMBean.incrementWriteOnlyTransactionCount();
+
+ return getContext().actorOf(
+ ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
+ schemaContext, datastoreContext, shardMBean,
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
+ } else {
+ throw new IllegalArgumentException(
+ "Shard="+name + ":CreateTransaction message has unidentified transaction type="
+ + transactionType);
+ }
+ }
+
+ private void createTransaction(CreateTransaction createTransaction) {
+ try {
+ ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
+ createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+ createTransaction.getVersion());
+
+ getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
+ createTransaction.getTransactionId()).toSerializable(), getSelf());
+ } catch (Exception e) {
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ }
+
+ private ActorRef createTransaction(int transactionType, String remoteTransactionId,
+ String transactionChainId, short clientVersion) {
+
+ ShardTransactionIdentifier transactionId =
+ ShardTransactionIdentifier.builder()
+ .remoteTransactionId(remoteTransactionId)
+ .build();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
+ }
+
+ ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
+ transactionChainId, clientVersion);
+
+ return transactionActor;
+ }
+
+ private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+ throws ExecutionException, InterruptedException {
+ DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+ commitCohort.preCommit().get();
+ commitCohort.commit().get();
+ }
+
+ private void commitWithNewTransaction(final Modification modification) {
+ DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
+ modification.apply(tx);
+ try {
+ syncCommitTransaction(tx);
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+ } catch (InterruptedException | ExecutionException e) {
+ shardMBean.incrementFailedTransactionsCount();
+ LOG.error(e, "{}: Failed to commit", persistenceId());
+ }
+ }
+
+ private void updateSchemaContext(final UpdateSchemaContext message) {
+ this.schemaContext = message.getSchemaContext();
+ updateSchemaContext(message.getSchemaContext());
+ store.onGlobalContextUpdated(message.getSchemaContext());
+ }
+
+ @VisibleForTesting
+ void updateSchemaContext(final SchemaContext schemaContext) {
+ store.onGlobalContextUpdated(schemaContext);
+ }
+
+ private void registerChangeListener(final RegisterChangeListener registerChangeListener) {
+
+ LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath());
+
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration;
+ if(isLeader()) {
+ registration = doChangeListenerRegistration(registerChangeListener);
+ } else {
+ LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
+
+ DelayedListenerRegistration delayedReg =
+ new DelayedListenerRegistration(registerChangeListener);
+ delayedListenerRegistrations.add(delayedReg);
+ registration = delayedReg;
+ }
+
+ ActorRef listenerRegistration = getContext().actorOf(
+ DataChangeListenerRegistration.props(registration));
+
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ persistenceId(), listenerRegistration.path());
+
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+ }
+
+ private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> doChangeListenerRegistration(
+ final RegisterChangeListener registerChangeListener) {
+
+ ActorSelection dataChangeListenerPath = getContext().system().actorSelection(
+ registerChangeListener.getDataChangeListenerPath());
+
+ // Notify the listener if notifications should be enabled or not
+ // If this shard is the leader then it will enable notifications else
+ // it will not
+ dataChangeListenerPath.tell(new EnableNotification(true), getSelf());
+
+ // Now store a reference to the data change listener so it can be notified
+ // at a later point if notifications should be enabled or disabled
+ dataChangeListeners.add(dataChangeListenerPath);
+
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+ new DataChangeListenerProxy(dataChangeListenerPath);
+
+ LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath());
+
+ return store.registerChangeListener(registerChangeListener.getPath(), listener,
+ registerChangeListener.getScope());
+ }
+
+ private boolean isMetricsCaptureEnabled(){
+ CommonConfig config = new CommonConfig(getContext().system().settings().config());
+ return config.isMetricCaptureEnabled();
+ }
+
+ @Override
+ protected
+ void startLogRecoveryBatch(final int maxBatchSize) {
+ currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
+ }
+ }
+
+ @Override
+ protected void appendRecoveredLogEntry(final Payload data) {
+ if(data instanceof ModificationPayload) {
+ try {
+ currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
+ } catch (ClassNotFoundException | IOException e) {
+ LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+ }
+ } else if (data instanceof CompositeModificationPayload) {
+ currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+ } else if (data instanceof CompositeModificationByteStringPayload) {
+ currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
+ } else {
+ LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
+ }
+ }
+
+ @Override
+ protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
+ if(recoveryCoordinator == null) {
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
+ }
+
+ recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: submitted recovery sbapshot", persistenceId());
+ }
+ }
+
+ @Override
+ protected void applyCurrentLogRecoveryBatch() {
+ if(recoveryCoordinator == null) {
+ recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
+ LOG, name.toString());
+ }
+
+ recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
+ currentLogRecoveryBatch.size());
+ }
+ }
+
+ @Override
+ protected void onRecoveryComplete() {
+ if(recoveryCoordinator != null) {
+ Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
+ }
+
+ for(DOMStoreWriteTransaction tx: txList) {