BUG-8515: make sure we retry connection on NotLeaderException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ClientLocalHistory.java
index b22f2bdc7b79853a9b194fdf04f699c2ccaf1836..493eb4089eb86cf9f119bfdf5589f780404b3715 100644 (file)
@@ -7,56 +7,87 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import akka.actor.ActorRef;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 /**
  * Client-side view of a local history. This class tracks all state related to a particular history and routes
  * frontend requests towards the backend.
  *
+ * <p>
  * This interface is used by the world outside of the actor system and in the actor system it is manifested via
- * its client actor. That requires some state transfer with {@link DistributedDataStoreClientBehavior}. In order to
+ * its client actor. That requires some state transfer with {@link AbstractDataStoreClientBehavior}. In order to
  * reduce request latency, all messages are carbon-copied (and enqueued first) to the client actor.
  *
  * @author Robert Varga
  */
 @Beta
-@NotThreadSafe
-public final class ClientLocalHistory implements AutoCloseable {
-    private static final AtomicIntegerFieldUpdater<ClientLocalHistory> CLOSED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(ClientLocalHistory.class, "state");
-    private static final int IDLE_STATE = 0;
-    private static final int CLOSED_STATE = 1;
+public class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
+    ClientLocalHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
+        super(client, historyId);
+    }
 
-    private final LocalHistoryIdentifier historyId;
-    private final ActorRef backendActor;
-    private final ActorRef clientActor;
+    @Override
+    public void close() {
+        doClose();
+    }
 
-    private volatile int state = IDLE_STATE;
+    private State ensureIdleState() {
+        final State local = state();
+        Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
+        return local;
+    }
 
-    ClientLocalHistory(final DistributedDataStoreClientBehavior client, final long historyId,
-            final ActorRef backendActor) {
-        this.clientActor = client.self();
-        this.backendActor = Preconditions.checkNotNull(backendActor);
-        this.historyId = new LocalHistoryIdentifier(client.getIdentifier(), historyId);
+    @Override
+    ClientSnapshot doCreateSnapshot() {
+        ensureIdleState();
+        return new ClientSnapshot(this, new TransactionIdentifier(getIdentifier(), nextTx()));
     }
 
-    private void checkNotClosed() {
-        Preconditions.checkState(state != CLOSED_STATE, "Local history %s has been closed", historyId);
+    @Override
+    ClientTransaction doCreateTransaction() {
+        updateState(ensureIdleState(), State.TX_OPEN);
+        return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
     }
 
     @Override
-    public void close() {
-        if (CLOSED_UPDATER.compareAndSet(this, IDLE_STATE, CLOSED_STATE)) {
-            // FIXME: signal close to both client actor and backend actor
-        } else if (state != CLOSED_STATE) {
-            throw new IllegalStateException("Cannot close history with an open transaction");
+    void onTransactionAbort(final AbstractClientHandle<?> snap) {
+        if (snap instanceof ClientTransaction) {
+            final State local = state();
+            if (local == State.TX_OPEN) {
+                updateState(local, State.IDLE);
+            }
         }
+
+        super.onTransactionAbort(snap);
     }
 
-    // FIXME: add client requests related to a particular local history
+    @Override
+    AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
+            final AbstractTransactionCommitCohort cohort) {
+
+        final State local = state();
+        switch (local) {
+            case CLOSED:
+                return super.onTransactionReady(tx, cohort);
+            case IDLE:
+                throw new IllegalStateException(String.format("Local history %s is idle when readying transaction %s",
+                    this, tx.getIdentifier()));
+            case TX_OPEN:
+                updateState(local, State.IDLE);
+                return super.onTransactionReady(tx, cohort);
+            default:
+                throw new IllegalStateException(String.format("Local history %s in unhandled state %s", this, local));
+
+        }
+    }
+
+    @Override
+    ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final AbstractClientConnection<ShardBackendInfo> connection) {
+        return ProxyHistory.createClient(this, connection, historyId);
+    }
 }