X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractFrontendHistory.java;h=dbea3a1cbb5cd00058c0bcf38dff6ce7f0b1e1b1;hp=7ddad749d26552676a8057b40ca7526677bac130;hb=de64c6bbf2d5aeb51f4036f9dd606a9bf6f71afb;hpb=cd2a6fa0d8fa6281be28d3c7b9828ecf4e932811 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 7ddad749d2..dbea3a1cbb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -7,14 +7,18 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; import java.util.HashMap; import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -55,41 +59,69 @@ abstract class AbstractFrontendHistory implements Identifiable handleTransactionRequest(final TransactionRequest request, final RequestEnvelope envelope, final long now) throws RequestException { - - // FIXME: handle purging of transactions - final TransactionIdentifier id = request.getTarget(); - FrontendTransaction tx = transactions.get(id); - if (tx == null) { - // The transaction does not exist and we are about to create it, check sequence number - if (request.getSequence() != 0) { - LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request); - throw UNSEQUENCED_START; + + FrontendTransaction tx; + if (request instanceof TransactionPurgeRequest) { + tx = transactions.remove(id); + if (tx == null) { + // We have no record of the transaction, nothing to do + LOG.debug("{}: no state for transaction {}, purge is complete", persistenceId(), id); + return new TransactionPurgeResponse(id, request.getSequence()); } + } else { + tx = transactions.get(id); + if (tx == null) { + // The transaction does not exist and we are about to create it, check sequence number + if (request.getSequence() != 0) { + LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request); + throw UNSEQUENCED_START; + } - if (request instanceof CommitLocalTransactionRequest) { - tx = createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification()); - LOG.debug("{}: allocated new ready transaction {}", persistenceId(), id); + tx = createTransaction(request, id); + transactions.put(id, tx); } else { - tx = createOpenTransaction(id); - LOG.debug("{}: allocated new open transaction {}", persistenceId(), id); + final Optional> maybeReplay = tx.replaySequence(request.getSequence()); + if (maybeReplay.isPresent()) { + final TransactionSuccess replay = maybeReplay.get(); + LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay); + return replay; + } } + } - transactions.put(id, tx); - } else { - final Optional> replay = tx.replaySequence(request.getSequence()); - if (replay.isPresent()) { - return replay.get(); + return tx.handleRequest(request, envelope, now); + } + + private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) + throws RequestException { + if (request instanceof CommitLocalTransactionRequest) { + LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id); + return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification()); + } + if (request instanceof AbstractReadTransactionRequest) { + if (((AbstractReadTransactionRequest) request).isSnapshotOnly()) { + LOG.debug("{}: allocatint new open snapshot {}", persistenceId(), id); + return createOpenSnapshot(id); } } - return tx.handleRequest(request, envelope, now); + LOG.debug("{}: allocating new open transaction {}", persistenceId(), id); + return createOpenTransaction(id); } + abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id) throws RequestException; + abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException; abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod) throws RequestException; - abstract ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod); + abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod); + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier()) + .add("persistenceId", persistenceId).add("transactions", transactions).toString(); + } }