2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.util.Timeout;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.function.Consumer;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
28 import org.opendaylight.netconf.api.ModifyAction;
29 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
30 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.ExecutionContext;
36 import scala.concurrent.Future;
39 * ProxyNetconfService uses provided {@link ActorRef} to delegate method calls to master
40 * {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfDataTreeServiceActor}.
42 public class ProxyNetconfService implements NetconfDataTreeService {
43 private static final Logger LOG = LoggerFactory.getLogger(ProxyNetconfService.class);
45 private final RemoteDeviceId id;
46 @GuardedBy("queuedOperations")
47 private final List<Consumer<ProxyNetconfServiceFacade>> queuedOperations = new ArrayList<>();
49 private volatile ProxyNetconfServiceFacade netconfFacade;
51 public ProxyNetconfService(final RemoteDeviceId id, final Future<Object> masterActorFuture,
52 final ExecutionContext executionContext, final Timeout askTimeout) {
54 masterActorFuture.onComplete(new OnComplete<>() {
56 public void onComplete(final Throwable failure, final Object masterActor) {
57 final ProxyNetconfServiceFacade newNetconfFacade;
58 if (failure != null) {
59 LOG.debug("{}: Failed to obtain master actor", id, failure);
60 newNetconfFacade = new FailedProxyNetconfServiceFacade(id, failure);
62 LOG.debug("{}: Obtained master actor {}", id, masterActor);
63 newNetconfFacade = new ActorProxyNetconfServiceFacade((ActorRef) masterActor, id,
64 executionContext, askTimeout);
66 executePriorNetconfOperations(newNetconfFacade);
72 public ListenableFuture<DOMRpcResult> lock() {
73 LOG.debug("{}: Lock", id);
74 final SettableFuture<DOMRpcResult> future = SettableFuture.create();
75 processNetconfOperation(facade -> future.setFuture(facade.lock()));
80 public ListenableFuture<DOMRpcResult> unlock() {
81 LOG.debug("{}: Unlock", id);
82 final SettableFuture<DOMRpcResult> future = SettableFuture.create();
83 processNetconfOperation(facade -> future.setFuture(facade.unlock()));
88 public ListenableFuture<DOMRpcResult> discardChanges() {
89 LOG.debug("{}: Discard changes", id);
90 final SettableFuture<DOMRpcResult> future = SettableFuture.create();
91 processNetconfOperation(facade -> future.setFuture(facade.discardChanges()));
96 public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
97 LOG.debug("{}: Get {} {}", id, OPERATIONAL, path);
98 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
99 processNetconfOperation(facade -> returnFuture.setFuture(facade.get(path)));
104 public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
105 final List<YangInstanceIdentifier> fields) {
106 LOG.debug("{}: Get {} {} with fields: {}", id, OPERATIONAL, path, fields);
107 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
108 processNetconfOperation(facade -> returnFuture.setFuture(facade.get(path, fields)));
113 public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
114 LOG.debug("{}: Get config {} {}", id, CONFIGURATION, path);
115 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
116 processNetconfOperation(facade -> returnFuture.setFuture(facade.getConfig(path)));
121 public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
122 final List<YangInstanceIdentifier> fields) {
123 LOG.debug("{}: Get config {} {} with fields: {}", id, CONFIGURATION, path, fields);
124 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
125 processNetconfOperation(facade -> returnFuture.setFuture(facade.getConfig(path, fields)));
130 public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
131 final YangInstanceIdentifier path, final NormalizedNode data,
132 final Optional<ModifyAction> defaultOperation) {
133 LOG.debug("{}: Merge {} {}", id, store, path);
134 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
135 processNetconfOperation(facade -> returnFuture.setFuture(facade.merge(store, path, data, defaultOperation)));
140 public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
141 final YangInstanceIdentifier path, final NormalizedNode data,
142 final Optional<ModifyAction> defaultOperation) {
143 LOG.debug("{}: Replace {} {}", id, store, path);
144 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
145 processNetconfOperation(facade -> returnFuture.setFuture(facade.replace(store, path, data, defaultOperation)));
150 public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
151 final YangInstanceIdentifier path, final NormalizedNode data,
152 final Optional<ModifyAction> defaultOperation) {
153 LOG.debug("{}: Create {} {}", id, store, path);
154 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
155 processNetconfOperation(facade -> returnFuture.setFuture(facade.create(store, path, data, defaultOperation)));
160 public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
161 final YangInstanceIdentifier path) {
162 LOG.debug("{}: Delete {} {}", id, store, path);
163 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
164 processNetconfOperation(facade -> returnFuture.setFuture(facade.delete(store, path)));
169 public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
170 final YangInstanceIdentifier path) {
171 LOG.debug("{}: Remove {} {}", id, store, path);
172 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
173 processNetconfOperation(facade -> returnFuture.setFuture(facade.remove(store, path)));
178 public ListenableFuture<? extends DOMRpcResult> commit() {
179 LOG.debug("{}: Commit", id);
180 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
181 processNetconfOperation(facade -> returnFuture.setFuture(facade.commit()));
186 public @NonNull Object getDeviceId() {
190 private void processNetconfOperation(final Consumer<ProxyNetconfServiceFacade> operation) {
191 final ProxyNetconfServiceFacade facadeOnEntry;
192 synchronized (queuedOperations) {
193 if (netconfFacade == null) {
194 LOG.debug("{}: Queuing netconf operation", id);
196 queuedOperations.add(operation);
197 facadeOnEntry = null;
199 facadeOnEntry = netconfFacade;
203 if (facadeOnEntry != null) {
204 operation.accept(facadeOnEntry);
208 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
209 justification = "https://github.com/spotbugs/spotbugs/issues/811")
210 private void executePriorNetconfOperations(final ProxyNetconfServiceFacade newNetconfFacade) {
212 // Access to queuedOperations and netconfFacade must be protected and atomic
213 // (ie synchronized) with respect to #processNetconfOperation to handle timing
214 // issues and ensure no ProxyNetconfServiceFacade is missed and that they are processed
215 // in the order they occurred.
217 // We'll make a local copy of the queuedOperations list to handle re-entrancy
218 // in case a netconf operation results in another netconf operation being
219 // queued (eg a put operation from a client read Future callback that is notified
221 final Collection<Consumer<ProxyNetconfServiceFacade>> operationsBatch;
222 synchronized (queuedOperations) {
223 if (queuedOperations.isEmpty()) {
224 // We're done invoking the netconf operations so we can now publish the
225 // ProxyNetconfServiceFacade.
226 netconfFacade = newNetconfFacade;
230 operationsBatch = new ArrayList<>(queuedOperations);
231 queuedOperations.clear();
234 // Invoke netconf operations outside the sync block to avoid unnecessary blocking.
235 for (Consumer<ProxyNetconfServiceFacade> oper : operationsBatch) {
236 oper.accept(newNetconfFacade);