BUG-7033: Fix commit exception due to pipe-lining
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractFrontendHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Ticker;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.Optional;
15 import javax.annotation.Nullable;
16 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
17 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
18 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
19 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
21 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
23 import org.opendaylight.controller.cluster.access.concepts.RequestException;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.yangtools.concepts.Identifiable;
26 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * Abstract class for providing logical tracking of frontend local histories. This class is specialized for
32  * standalone transactions and chained transactions.
33  *
34  * @author Robert Varga
35  */
36 abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdentifier> {
37     private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class);
38     private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0);
39
40     private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
41     private final String persistenceId;
42     private final Ticker ticker;
43
44     AbstractFrontendHistory(final String persistenceId, final Ticker ticker) {
45         this.persistenceId = Preconditions.checkNotNull(persistenceId);
46         this.ticker = Preconditions.checkNotNull(ticker);
47     }
48
49     final String persistenceId() {
50         return persistenceId;
51     }
52
53     final long readTime() {
54         return ticker.read();
55     }
56
57     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
58             final RequestEnvelope envelope, final long now) throws RequestException {
59
60         // FIXME: handle purging of transactions
61
62         final TransactionIdentifier id = request.getTarget();
63         FrontendTransaction tx = transactions.get(id);
64         if (tx == null) {
65             // The transaction does not exist and we are about to create it, check sequence number
66             if (request.getSequence() != 0) {
67                 LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
68                 throw UNSEQUENCED_START;
69             }
70
71             tx = createTransaction(request, id);
72             transactions.put(id, tx);
73         } else {
74             final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
75             if (maybeReplay.isPresent()) {
76                 final TransactionSuccess<?> replay = maybeReplay.get();
77                 LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
78                 return replay;
79             }
80         }
81
82         return tx.handleRequest(request, envelope, now);
83     }
84
85     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
86             throws RequestException {
87         if (request instanceof CommitLocalTransactionRequest) {
88             LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id);
89             return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
90         }
91         if (request instanceof AbstractReadTransactionRequest) {
92             if (((AbstractReadTransactionRequest<?>) request).isSnapshotOnly()) {
93                 LOG.debug("{}: allocatint new open snapshot {}", persistenceId(), id);
94                 return createOpenSnapshot(id);
95             }
96         }
97
98         LOG.debug("{}: allocating new open transaction {}", persistenceId(), id);
99         return createOpenTransaction(id);
100     }
101
102     abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id) throws RequestException;
103
104     abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
105
106     abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
107         throws RequestException;
108
109     abstract ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod);
110 }