f75d443fc79f4acc9d2b1007ae941f6c1482b494
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / LocalProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.Futures;
14 import java.util.function.Consumer;
15 import javax.annotation.Nullable;
16 import javax.annotation.concurrent.NotThreadSafe;
17 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
18 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
19 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
20 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
21 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
22 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
23 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
24 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
28 import org.opendaylight.mdsal.common.api.ReadFailedException;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with
39  * the client instance.
40  *
41  * <p>
42  * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations
43  * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard
44  * leader.
45  *
46  * <p>
47  * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the
48  * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern.
49  *
50  * @author Robert Varga
51  */
52 @NotThreadSafe
53 abstract class LocalProxyTransaction extends AbstractProxyTransaction {
54     private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class);
55
56     private final TransactionIdentifier identifier;
57
58     LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) {
59         super(parent);
60         this.identifier = Preconditions.checkNotNull(identifier);
61     }
62
63     @Override
64     public final TransactionIdentifier getIdentifier() {
65         return identifier;
66     }
67
68     abstract DataTreeSnapshot readOnlyView();
69
70     abstract void applyModifyTransactionRequest(ModifyTransactionRequest request,
71             @Nullable Consumer<Response<?, ?>> callback);
72
73     @Override
74     final CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
75         return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent());
76     }
77
78     @Override
79     final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
80         return Futures.immediateCheckedFuture(readOnlyView().readNode(path));
81     }
82
83     @Override
84     final void doAbort() {
85         sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
86             LOG.debug("Transaction {} abort completed with {}", identifier, response);
87         });
88     }
89
90     @Override
91     void handleForwardedRemoteRequest(final TransactionRequest<?> request,
92             final @Nullable Consumer<Response<?, ?>> callback) {
93         if (request instanceof ModifyTransactionRequest) {
94             applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
95         } else if (request instanceof ReadTransactionRequest) {
96             final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
97             final Optional<NormalizedNode<?, ?>> result = readOnlyView().readNode(path);
98             callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
99         } else if (request instanceof ExistsTransactionRequest) {
100             final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
101             final boolean result = readOnlyView().readNode(path).isPresent();
102             callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
103         } else {
104             throw new IllegalArgumentException("Unhandled request " + request);
105         }
106     }
107
108     @Override
109     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
110             final Consumer<Response<?, ?>> callback) {
111         if (request instanceof CommitLocalTransactionRequest) {
112             final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
113             final DataTreeModification mod = req.getModification();
114
115             LOG.debug("Applying modification {} to successor {}", mod, successor);
116             mod.applyToCursor(new AbstractDataTreeModificationCursor() {
117                 @Override
118                 public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
119                     successor.write(current().node(child), data);
120                 }
121
122                 @Override
123                 public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
124                     successor.merge(current().node(child), data);
125                 }
126
127                 @Override
128                 public void delete(final PathArgument child) {
129                     successor.delete(current().node(child));
130                 }
131             });
132
133             successor.seal();
134
135             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
136             successor.sendRequest(successorReq, callback);
137         } else if (request instanceof AbortLocalTransactionRequest) {
138             LOG.debug("Forwarding abort {} to successor {}", request, successor);
139             successor.abort();
140         } else {
141             throw new IllegalArgumentException("Unhandled request" + request);
142         }
143     }
144
145     @Override
146     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
147             final Consumer<Response<?, ?>> callback) {
148         if (request instanceof AbortLocalTransactionRequest) {
149             successor.sendAbort(request, callback);
150         } else {
151             throw new IllegalArgumentException("Unhandled request" + request);
152         }
153
154         LOG.debug("Forwarded request {} to successor {}", request, successor);
155     }
156
157     void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
158         sendRequest(request, callback);
159     }
160 }