Bump MRI upstreams
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / netconf / ProxyNetconfService.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
9
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
12
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.util.Timeout;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.function.Consumer;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
28 import org.opendaylight.netconf.api.ModifyAction;
29 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
30 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.ExecutionContext;
36 import scala.concurrent.Future;
37
38 /**
39  * ProxyNetconfService uses provided {@link ActorRef} to delegate method calls to master
40  * {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfDataTreeServiceActor}.
41  */
42 public class ProxyNetconfService implements NetconfDataTreeService {
43     private static final Logger LOG = LoggerFactory.getLogger(ProxyNetconfService.class);
44
45     private final RemoteDeviceId id;
46     @GuardedBy("queuedOperations")
47     private final List<Consumer<ProxyNetconfServiceFacade>> queuedOperations = new ArrayList<>();
48
49     private volatile ProxyNetconfServiceFacade netconfFacade;
50
51     public ProxyNetconfService(final RemoteDeviceId id, final Future<Object> masterActorFuture,
52                                final ExecutionContext executionContext, final Timeout askTimeout) {
53         this.id = id;
54         masterActorFuture.onComplete(new OnComplete<>() {
55             @Override
56             public void onComplete(final Throwable failure, final Object masterActor) {
57                 final ProxyNetconfServiceFacade newNetconfFacade;
58                 if (failure != null) {
59                     LOG.debug("{}: Failed to obtain master actor", id, failure);
60                     newNetconfFacade = new FailedProxyNetconfServiceFacade(id, failure);
61                 } else {
62                     LOG.debug("{}: Obtained master actor {}", id, masterActor);
63                     newNetconfFacade = new ActorProxyNetconfServiceFacade((ActorRef) masterActor, id,
64                         executionContext, askTimeout);
65                 }
66                 executePriorNetconfOperations(newNetconfFacade);
67             }
68         }, executionContext);
69     }
70
71     @Override
72     public ListenableFuture<DOMRpcResult> lock() {
73         LOG.debug("{}: Lock", id);
74         final SettableFuture<DOMRpcResult> future = SettableFuture.create();
75         processNetconfOperation(facade -> future.setFuture(facade.lock()));
76         return future;
77     }
78
79     @Override
80     public ListenableFuture<DOMRpcResult> unlock() {
81         LOG.debug("{}: Unlock", id);
82         final SettableFuture<DOMRpcResult> future = SettableFuture.create();
83         processNetconfOperation(facade -> future.setFuture(facade.unlock()));
84         return future;
85     }
86
87     @Override
88     public ListenableFuture<DOMRpcResult> discardChanges() {
89         LOG.debug("{}: Discard changes", id);
90         final SettableFuture<DOMRpcResult> future = SettableFuture.create();
91         processNetconfOperation(facade -> future.setFuture(facade.discardChanges()));
92         return future;
93     }
94
95     @Override
96     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
97         LOG.debug("{}: Get {} {}", id, OPERATIONAL, path);
98         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
99         processNetconfOperation(facade -> returnFuture.setFuture(facade.get(path)));
100         return returnFuture;
101     }
102
103     @Override
104     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
105                                                           final List<YangInstanceIdentifier> fields) {
106         LOG.debug("{}: Get {} {} with fields: {}", id, OPERATIONAL, path, fields);
107         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
108         processNetconfOperation(facade -> returnFuture.setFuture(facade.get(path, fields)));
109         return returnFuture;
110     }
111
112     @Override
113     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
114         LOG.debug("{}: Get config {} {}", id, CONFIGURATION, path);
115         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
116         processNetconfOperation(facade -> returnFuture.setFuture(facade.getConfig(path)));
117         return returnFuture;
118     }
119
120     @Override
121     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
122                                                                 final List<YangInstanceIdentifier> fields) {
123         LOG.debug("{}: Get config {} {} with fields: {}", id, CONFIGURATION, path, fields);
124         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
125         processNetconfOperation(facade -> returnFuture.setFuture(facade.getConfig(path, fields)));
126         return returnFuture;
127     }
128
129     @Override
130     public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
131             final YangInstanceIdentifier path, final NormalizedNode data,
132             final Optional<ModifyAction> defaultOperation) {
133         LOG.debug("{}: Merge {} {}", id, store, path);
134         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
135         processNetconfOperation(facade -> returnFuture.setFuture(facade.merge(store, path, data, defaultOperation)));
136         return returnFuture;
137     }
138
139     @Override
140     public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
141             final YangInstanceIdentifier path, final NormalizedNode data,
142             final Optional<ModifyAction> defaultOperation) {
143         LOG.debug("{}: Replace {} {}", id, store, path);
144         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
145         processNetconfOperation(facade -> returnFuture.setFuture(facade.replace(store, path, data, defaultOperation)));
146         return returnFuture;
147     }
148
149     @Override
150     public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
151             final YangInstanceIdentifier path, final NormalizedNode data,
152             final Optional<ModifyAction> defaultOperation) {
153         LOG.debug("{}: Create {} {}", id, store, path);
154         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
155         processNetconfOperation(facade -> returnFuture.setFuture(facade.create(store, path, data, defaultOperation)));
156         return returnFuture;
157     }
158
159     @Override
160     public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
161             final YangInstanceIdentifier path) {
162         LOG.debug("{}: Delete {} {}", id, store, path);
163         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
164         processNetconfOperation(facade -> returnFuture.setFuture(facade.delete(store, path)));
165         return returnFuture;
166     }
167
168     @Override
169     public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
170             final YangInstanceIdentifier path) {
171         LOG.debug("{}: Remove {} {}", id, store, path);
172         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
173         processNetconfOperation(facade -> returnFuture.setFuture(facade.remove(store, path)));
174         return returnFuture;
175     }
176
177     @Override
178     public ListenableFuture<? extends DOMRpcResult> commit() {
179         LOG.debug("{}: Commit", id);
180         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
181         processNetconfOperation(facade -> returnFuture.setFuture(facade.commit()));
182         return returnFuture;
183     }
184
185     @Override
186     public @NonNull Object getDeviceId() {
187         return id;
188     }
189
190     private void processNetconfOperation(final Consumer<ProxyNetconfServiceFacade> operation) {
191         final ProxyNetconfServiceFacade facadeOnEntry;
192         synchronized (queuedOperations) {
193             if (netconfFacade == null) {
194                 LOG.debug("{}: Queuing netconf operation", id);
195
196                 queuedOperations.add(operation);
197                 facadeOnEntry = null;
198             } else {
199                 facadeOnEntry = netconfFacade;
200             }
201         }
202
203         if (facadeOnEntry != null) {
204             operation.accept(facadeOnEntry);
205         }
206     }
207
208     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
209         justification = "https://github.com/spotbugs/spotbugs/issues/811")
210     private void executePriorNetconfOperations(final ProxyNetconfServiceFacade newNetconfFacade) {
211         while (true) {
212             // Access to queuedOperations and netconfFacade must be protected and atomic
213             // (ie synchronized) with respect to #processNetconfOperation to handle timing
214             // issues and ensure no ProxyNetconfServiceFacade is missed and that they are processed
215             // in the order they occurred.
216
217             // We'll make a local copy of the queuedOperations list to handle re-entrancy
218             // in case a netconf operation results in another netconf operation being
219             // queued (eg a put operation from a client read Future callback that is notified
220             // synchronously).
221             final Collection<Consumer<ProxyNetconfServiceFacade>> operationsBatch;
222             synchronized (queuedOperations) {
223                 if (queuedOperations.isEmpty()) {
224                     // We're done invoking the netconf operations so we can now publish the
225                     // ProxyNetconfServiceFacade.
226                     netconfFacade = newNetconfFacade;
227                     break;
228                 }
229
230                 operationsBatch = new ArrayList<>(queuedOperations);
231                 queuedOperations.clear();
232             }
233
234             // Invoke netconf operations outside the sync block to avoid unnecessary blocking.
235             for (Consumer<ProxyNetconfServiceFacade> oper : operationsBatch) {
236                 oper.accept(newNetconfFacade);
237             }
238         }
239     }
240 }