X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FClientLocalHistory.java;h=ac1872835ac37a612acdd0e9401c57c17f789c6a;hb=db9a673c114febc785fbd324947ac2c3e3095d06;hp=b22f2bdc7b79853a9b194fdf04f699c2ccaf1836;hpb=d0621d28e507d9f6c0b9445d197f90253d34725d;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java index b22f2bdc7b..ac1872835a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -7,56 +7,79 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.actor.ActorRef; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; /** * Client-side view of a local history. This class tracks all state related to a particular history and routes * frontend requests towards the backend. * + *

* This interface is used by the world outside of the actor system and in the actor system it is manifested via - * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to + * its client actor. That requires some state transfer with {@link AbstractDataStoreClientBehavior}. In order to * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor. * * @author Robert Varga */ @Beta -@NotThreadSafe -public final class ClientLocalHistory implements AutoCloseable { - private static final AtomicIntegerFieldUpdater CLOSED_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state"); - private static final int IDLE_STATE = 0; - private static final int CLOSED_STATE = 1; - - private final LocalHistoryIdentifier historyId; - private final ActorRef backendActor; - private final ActorRef clientActor; - - private volatile int state = IDLE_STATE; - - ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId, - final ActorRef backendActor) { - this.clientActor = client.self(); - this.backendActor = Preconditions.checkNotNull(backendActor); - this.historyId = new LocalHistoryIdentifier(client.getIdentifier(), historyId); +public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable { + ClientLocalHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) { + super(client, historyId); } - private void checkNotClosed() { - Preconditions.checkState(state != CLOSED_STATE, "Local history %s has been closed", historyId); + @Override + public void close() { + final State local = state(); + if (local != State.CLOSED) { + Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this); + updateState(local, State.CLOSED); + } } @Override - public void close() { - if (CLOSED_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) { - // FIXME: signal close to both client actor and backend actor - } else if (state != CLOSED_STATE) { - throw new IllegalStateException("Cannot close history with an open transaction"); + ClientTransaction doCreateTransaction() { + final State local = state(); + Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local); + updateState(local, State.TX_OPEN); + + return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx())); + } + + @Override + void onTransactionAbort(final TransactionIdentifier txId) { + final State local = state(); + if (local == State.TX_OPEN) { + updateState(local, State.IDLE); + } + + super.onTransactionAbort(txId); + } + + @Override + AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, + final AbstractTransactionCommitCohort cohort) { + final State local = state(); + switch (local) { + case CLOSED: + return super.onTransactionReady(txId, cohort); + case IDLE: + throw new IllegalStateException(String.format("Local history %s is idle when readying transaction %s", + this, txId)); + case TX_OPEN: + updateState(local, State.IDLE); + return super.onTransactionReady(txId, cohort); + default: + throw new IllegalStateException(String.format("Local history %s in unhandled state %s", this, local)); + } } - // FIXME: add client requests related to a particular local history + @Override + ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, + final AbstractClientConnection connection) { + return ProxyHistory.createClient(connection, historyId); + } }