BUG-5280: implement backend message handling
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
new file mode 100644 (file)
index 0000000..3c65b79
--- /dev/null
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
+import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
+import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.UnknownHistoryException;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
+ * in the frontend/backend conversation.
+ *
+ * @author Robert Varga
+ */
+@NotThreadSafe
+final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
+
+    // Histories which have not been purged
+    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+
+    // RangeSet performs automatic merging, hence we keep minimal state tracking information
+    private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+
+    // Used for all standalone transactions
+    private final AbstractFrontendHistory standaloneHistory;
+    private final ShardDataTree tree;
+    private final ClientIdentifier clientId;
+    private final String persistenceId;
+
+    private long expectedTxSequence;
+    private Long lastSeenHistory = null;
+
+
+    // TODO: explicit failover notification
+    //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
+    //       to the frontend client -- that way it can immediately start sending requests
+
+    // TODO: add statistics:
+    // - number of requests processed
+    // - number of histories processed
+    // - per-RequestException throw counters
+
+    LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+        this.persistenceId = Preconditions.checkNotNull(persistenceId);
+        this.clientId = Preconditions.checkNotNull(clientId);
+        this.tree = Preconditions.checkNotNull(tree);
+        standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+    }
+
+    @Override
+    public ClientIdentifier getIdentifier() {
+        return clientId;
+    }
+
+    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException {
+        if (expectedTxSequence != envelope.getTxSequence()) {
+            throw new OutOfOrderRequestException(expectedTxSequence);
+        }
+    }
+
+    private void expectNextRequest() {
+        expectedTxSequence++;
+    }
+
+    @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+            final RequestEnvelope envelope) throws RequestException {
+        checkRequestSequence(envelope);
+
+        try {
+            if (request instanceof CreateLocalHistoryRequest) {
+                return handleCreateHistory((CreateLocalHistoryRequest) request);
+            } else if (request instanceof DestroyLocalHistoryRequest) {
+                return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+            } else if (request instanceof PurgeLocalHistoryRequest) {
+                return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+            } else {
+                throw new UnsupportedRequestException(request);
+            }
+        } finally {
+            expectNextRequest();
+        }
+    }
+
+    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
+        final LocalHistoryIdentifier id = request.getTarget();
+        final AbstractFrontendHistory existing = localHistories.get(id);
+        if (existing != null) {
+            // History already exists: report success
+            LOG.debug("{}: history {} already exists", persistenceId, id);
+            return new LocalHistorySuccess(id, request.getSequence());
+        }
+
+        // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
+        // end up resurrecting a purged history.
+        if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+            LOG.debug("{}: rejecting purged request {}", persistenceId, request);
+            throw new DeadHistoryException(lastSeenHistory.longValue());
+        }
+
+        // Update last history we have seen
+        if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
+            lastSeenHistory = id.getHistoryId();
+        }
+
+        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
+        LOG.debug("{}: created history {}", persistenceId, id);
+        return new LocalHistorySuccess(id, request.getSequence());
+    }
+
+    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+        final LocalHistoryIdentifier id = request.getTarget();
+        final LocalFrontendHistory existing = localHistories.get(id);
+        if (existing == null) {
+            // History does not exist: report success
+            LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id);
+            return new LocalHistorySuccess(id, request.getSequence());
+        }
+
+        return existing.destroy(request.getSequence());
+    }
+
+    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+        final LocalHistoryIdentifier id = request.getTarget();
+        final LocalFrontendHistory existing = localHistories.remove(id);
+        if (existing != null) {
+            purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+
+            if (!existing.isDestroyed()) {
+                LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
+                existing.destroy(request.getSequence());
+            }
+
+            // FIXME: record a PURGE tombstone in the journal
+
+            LOG.debug("{}: purged history {}", persistenceId, id);
+        } else {
+            LOG.debug("{}: history {} has already been purged", persistenceId, id);
+        }
+
+        return new LocalHistorySuccess(id, request.getSequence());
+    }
+
+    @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+            final RequestEnvelope envelope) throws RequestException {
+        checkRequestSequence(envelope);
+
+        try {
+            final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
+            final AbstractFrontendHistory history;
+
+            if (lhId.getHistoryId() != 0) {
+                history = localHistories.get(lhId);
+                if (history == null) {
+                    LOG.debug("{}: rejecting unknown history request {}", persistenceId, request);
+                    throw new UnknownHistoryException(lastSeenHistory);
+                }
+            } else {
+                history = standaloneHistory;
+            }
+
+            return history.handleTransactionRequest(request, envelope);
+        } finally {
+            expectNextRequest();
+        }
+    }
+
+    void reconnect() {
+        expectedTxSequence = 0;
+    }
+
+    void retire() {
+        // FIXME: flush all state
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId)
+                .add("purgedHistories", purgedHistories).toString();
+    }
+}