eb1dd17bfd9438fc6adff5351db71527d90b3453
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / DistributedDataStoreClientBehavior.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.base.Throwables;
13 import com.google.common.base.Verify;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.function.Consumer;
18 import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
19 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
20 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
21 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
22 import org.opendaylight.controller.cluster.access.concepts.Response;
23 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
30  * frontend.
31  *
32  * <p>
33  * This class is not visible outside of this package because it breaks the actor containment. Services provided to
34  * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
35  *
36  * <p>
37  * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
38  *            When touching internal state, be mindful of the execution context from which execution context, Actor
39  *            or POJO, is the state being accessed or modified.
40  *
41  * <p>
42  * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
43  *                threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
44  *                doubt, feel free to synchronize on this object.
45  *
46  * <p>
47  * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
48  *              performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
49  *              for correctness and performance impact.
50  *
51  * <p>
52  * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
53  *             for performing work and charging applications for it. That has two positive effects:
54  *             - CPU usage is distributed across applications, minimizing work done in the actor thread
55  *             - CPU usage provides back-pressure towards the application.
56  *
57  * @author Robert Varga
58  */
59 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
60     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
61
62     private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
63     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
64     private final AtomicLong nextHistoryId = new AtomicLong(1);
65     private final AtomicLong nextTransactionId = new AtomicLong();
66     private final ModuleShardBackendResolver resolver;
67     private final SingleClientHistory singleHistory;
68
69     private volatile Throwable aborted;
70
71     DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
72         super(context);
73         resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
74         singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
75     }
76
77     //
78     //
79     // Methods below are invoked from the client actor thread
80     //
81     //
82
83     @Override
84     protected void haltClient(final Throwable cause) {
85         // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
86         // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
87         // thread.
88         if (aborted != null) {
89             abortOperations(cause);
90         }
91     }
92
93     private void abortOperations(final Throwable cause) {
94         // This acts as a barrier, application threads check this after they have added an entry in the maps,
95         // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
96         aborted = cause;
97
98         for (ClientLocalHistory h : histories.values()) {
99             h.localAbort(cause);
100         }
101         histories.clear();
102
103         for (ClientTransaction t : transactions.values()) {
104             t.localAbort(cause);
105         }
106         transactions.clear();
107     }
108
109     private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
110         abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
111         return null;
112     }
113
114     @Override
115     protected DistributedDataStoreClientBehavior onCommand(final Object command) {
116         if (command instanceof GetClientRequest) {
117             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
118         } else {
119             LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
120         }
121
122         return this;
123     }
124
125     //
126     //
127     // Methods below are invoked from application threads
128     //
129     //
130
131     @SuppressWarnings("checkstyle:IllegalCatch")
132     private static <K, V extends LocalAbortable> V returnIfOperational(final Map<K , V> map, final K key, final V value,
133             final Throwable aborted) {
134         Verify.verify(map.put(key, value) == null);
135
136         if (aborted != null) {
137             try {
138                 value.localAbort(aborted);
139             } catch (Exception e) {
140                 LOG.debug("Close of {} failed", value, e);
141             }
142             map.remove(key, value);
143             throw Throwables.propagate(aborted);
144         }
145
146         return value;
147     }
148
149     @Override
150     public ClientLocalHistory createLocalHistory() {
151         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
152             nextHistoryId.getAndIncrement());
153         final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
154         LOG.debug("{}: creating a new local history {}", persistenceId(), history);
155
156         return returnIfOperational(histories, historyId, history, aborted);
157     }
158
159     @Override
160     public ClientTransaction createTransaction() {
161         final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
162             nextTransactionId.getAndIncrement());
163         final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
164         LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
165
166         return returnIfOperational(transactions, txId, tx, aborted);
167     }
168
169     @Override
170     public void close() {
171         context().executeInActor(this::shutdown);
172     }
173
174     @Override
175     protected ModuleShardBackendResolver resolver() {
176         return resolver;
177     }
178
179     void transactionComplete(final ClientTransaction transaction) {
180         transactions.remove(transaction.getIdentifier());
181     }
182
183     void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
184         sendRequest(request, response -> {
185             completer.accept(response);
186             return this;
187         });
188     }
189
190 }