X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FClientLocalHistory.java;h=26b03e39b092c57d00876b030df15d1aeda015e8;hb=b66d5a3c59525a1c7885c3d653d9657a99f4103d;hp=aded49b14426051262e9a1838af476029b274382;hpb=97ff7dff8e58531065833736d5788808ca9e0396;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
index aded49b144..26b03e39b0 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
@@ -7,60 +7,91 @@
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import akka.actor.ActorRef;
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
/**
* Client-side view of a local history. This class tracks all state related to a particular history and routes
* frontend requests towards the backend.
*
+ *
* This interface is used by the world outside of the actor system and in the actor system it is manifested via
- * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to
+ * its client actor. That requires some state transfer with {@link AbstractDataStoreClientBehavior}. In order to
* reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor.
*
* @author Robert Varga
*/
@Beta
-public final class ClientLocalHistory implements AutoCloseable {
- private static final AtomicIntegerFieldUpdater STATE_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state");
- private static final int IDLE_STATE = 0;
- private static final int CLOSED_STATE = 1;
+public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
+ ClientLocalHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
+ super(client, historyId);
+ }
- private final ClientIdentifier clientId;
- private final long historyId;
- private final ActorRef backendActor;
- private final ActorRef clientActor;
+ @Override
+ public void close() {
+ final State local = state();
+ if (local != State.CLOSED) {
+ Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this);
+ updateState(local, State.CLOSED);
+ }
+ }
- private volatile int state = IDLE_STATE;
+ private State ensureIdleState() {
+ final State local = state();
+ Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
+ return local;
+ }
- ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId,
- final ActorRef backendActor) {
- this.clientActor = client.self();
- this.backendActor = Preconditions.checkNotNull(backendActor);
- this.clientId = Verify.verifyNotNull(client.getIdentifier());
- this.historyId = historyId;
+ @Override
+ ClientSnapshot doCreateSnapshot() {
+ ensureIdleState();
+ return new ClientSnapshot(this, new TransactionIdentifier(getIdentifier(), nextTx()));
}
- private void checkNotClosed() {
- if (state == CLOSED_STATE) {
- throw new IllegalStateException("Local history " + new LocalHistoryIdentifier(clientId, historyId) + " is closed");
+ @Override
+ ClientTransaction doCreateTransaction() {
+ updateState(ensureIdleState(), State.TX_OPEN);
+ return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
+ }
+
+ @Override
+ void onTransactionAbort(final AbstractClientHandle> snap) {
+ if (snap instanceof ClientTransaction) {
+ final State local = state();
+ if (local == State.TX_OPEN) {
+ updateState(local, State.IDLE);
+ }
}
+
+ super.onTransactionAbort(snap);
}
@Override
- public void close() {
- if (STATE_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) {
- // FIXME: signal close to both client actor and backend actor
- } else if (state != CLOSED_STATE) {
- throw new IllegalStateException("Cannot close history with an open transaction");
+ AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
+ final AbstractTransactionCommitCohort cohort) {
+
+ final State local = state();
+ switch (local) {
+ case CLOSED:
+ return super.onTransactionReady(tx, cohort);
+ case IDLE:
+ throw new IllegalStateException(String.format("Local history %s is idle when readying transaction %s",
+ this, tx.getIdentifier()));
+ case TX_OPEN:
+ updateState(local, State.IDLE);
+ return super.onTransactionReady(tx, cohort);
+ default:
+ throw new IllegalStateException(String.format("Local history %s in unhandled state %s", this, local));
+
}
}
- // FIXME: add client requests related to a particular local history
+ @Override
+ ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+ final AbstractClientConnection connection) {
+ return ProxyHistory.createClient(connection, historyId);
+ }
}