2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.util.Timeout;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Optional;
22 import java.util.function.Consumer;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
27 import org.opendaylight.netconf.api.EffectiveOperation;
28 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
29 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.ExecutionContext;
35 import scala.concurrent.Future;
38 * ProxyNetconfService uses provided {@link ActorRef} to delegate method calls to master
39 * {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfDataTreeServiceActor}.
41 public class ProxyNetconfService implements NetconfDataTreeService {
42 private static final Logger LOG = LoggerFactory.getLogger(ProxyNetconfService.class);
44 private final RemoteDeviceId id;
45 @GuardedBy("queuedOperations")
46 private final List<Consumer<ProxyNetconfServiceFacade>> queuedOperations = new ArrayList<>();
48 private volatile ProxyNetconfServiceFacade netconfFacade;
50 public ProxyNetconfService(final RemoteDeviceId id, final Future<Object> masterActorFuture,
51 final ExecutionContext executionContext, final Timeout askTimeout) {
53 masterActorFuture.onComplete(new OnComplete<>() {
55 public void onComplete(final Throwable failure, final Object masterActor) {
56 final ProxyNetconfServiceFacade newNetconfFacade;
57 if (failure != null) {
58 LOG.debug("{}: Failed to obtain master actor", id, failure);
59 newNetconfFacade = new FailedProxyNetconfServiceFacade(id, failure);
61 LOG.debug("{}: Obtained master actor {}", id, masterActor);
62 newNetconfFacade = new ActorProxyNetconfServiceFacade((ActorRef) masterActor, id,
63 executionContext, askTimeout);
65 executePriorNetconfOperations(newNetconfFacade);
71 public ListenableFuture<DOMRpcResult> lock() {
72 LOG.debug("{}: Lock", id);
73 final SettableFuture<DOMRpcResult> future = SettableFuture.create();
74 processNetconfOperation(facade -> future.setFuture(facade.lock()));
79 public ListenableFuture<DOMRpcResult> unlock() {
80 LOG.debug("{}: Unlock", id);
81 final SettableFuture<DOMRpcResult> future = SettableFuture.create();
82 processNetconfOperation(facade -> future.setFuture(facade.unlock()));
87 public ListenableFuture<DOMRpcResult> discardChanges() {
88 LOG.debug("{}: Discard changes", id);
89 final SettableFuture<DOMRpcResult> future = SettableFuture.create();
90 processNetconfOperation(facade -> future.setFuture(facade.discardChanges()));
95 public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
96 LOG.debug("{}: Get {} {}", id, OPERATIONAL, path);
97 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
98 processNetconfOperation(facade -> returnFuture.setFuture(facade.get(path)));
103 public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
104 final List<YangInstanceIdentifier> fields) {
105 LOG.debug("{}: Get {} {} with fields: {}", id, OPERATIONAL, path, fields);
106 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
107 processNetconfOperation(facade -> returnFuture.setFuture(facade.get(path, fields)));
112 public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
113 LOG.debug("{}: Get config {} {}", id, CONFIGURATION, path);
114 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
115 processNetconfOperation(facade -> returnFuture.setFuture(facade.getConfig(path)));
120 public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
121 final List<YangInstanceIdentifier> fields) {
122 LOG.debug("{}: Get config {} {} with fields: {}", id, CONFIGURATION, path, fields);
123 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
124 processNetconfOperation(facade -> returnFuture.setFuture(facade.getConfig(path, fields)));
129 public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
130 final YangInstanceIdentifier path, final NormalizedNode data,
131 final Optional<EffectiveOperation> defaultOperation) {
132 LOG.debug("{}: Merge {} {}", id, store, path);
133 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
134 processNetconfOperation(facade -> returnFuture.setFuture(facade.merge(store, path, data, defaultOperation)));
139 public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
140 final YangInstanceIdentifier path, final NormalizedNode data,
141 final Optional<EffectiveOperation> defaultOperation) {
142 LOG.debug("{}: Replace {} {}", id, store, path);
143 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
144 processNetconfOperation(facade -> returnFuture.setFuture(facade.replace(store, path, data, defaultOperation)));
149 public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
150 final YangInstanceIdentifier path, final NormalizedNode data,
151 final Optional<EffectiveOperation> defaultOperation) {
152 LOG.debug("{}: Create {} {}", id, store, path);
153 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
154 processNetconfOperation(facade -> returnFuture.setFuture(facade.create(store, path, data, defaultOperation)));
159 public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
160 final YangInstanceIdentifier path) {
161 LOG.debug("{}: Delete {} {}", id, store, path);
162 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
163 processNetconfOperation(facade -> returnFuture.setFuture(facade.delete(store, path)));
168 public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
169 final YangInstanceIdentifier path) {
170 LOG.debug("{}: Remove {} {}", id, store, path);
171 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
172 processNetconfOperation(facade -> returnFuture.setFuture(facade.remove(store, path)));
177 public ListenableFuture<? extends DOMRpcResult> commit() {
178 LOG.debug("{}: Commit", id);
179 final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
180 processNetconfOperation(facade -> returnFuture.setFuture(facade.commit()));
185 public @NonNull Object getDeviceId() {
189 private void processNetconfOperation(final Consumer<ProxyNetconfServiceFacade> operation) {
190 final ProxyNetconfServiceFacade facadeOnEntry;
191 synchronized (queuedOperations) {
192 if (netconfFacade == null) {
193 LOG.debug("{}: Queuing netconf operation", id);
195 queuedOperations.add(operation);
196 facadeOnEntry = null;
198 facadeOnEntry = netconfFacade;
202 if (facadeOnEntry != null) {
203 operation.accept(facadeOnEntry);
207 private void executePriorNetconfOperations(final ProxyNetconfServiceFacade newNetconfFacade) {
209 // Access to queuedOperations and netconfFacade must be protected and atomic
210 // (ie synchronized) with respect to #processNetconfOperation to handle timing
211 // issues and ensure no ProxyNetconfServiceFacade is missed and that they are processed
212 // in the order they occurred.
214 // We'll make a local copy of the queuedOperations list to handle re-entrancy
215 // in case a netconf operation results in another netconf operation being
216 // queued (eg a put operation from a client read Future callback that is notified
218 final Collection<Consumer<ProxyNetconfServiceFacade>> operationsBatch;
219 synchronized (queuedOperations) {
220 if (queuedOperations.isEmpty()) {
221 // We're done invoking the netconf operations so we can now publish the
222 // ProxyNetconfServiceFacade.
223 netconfFacade = newNetconfFacade;
227 operationsBatch = new ArrayList<>(queuedOperations);
228 queuedOperations.clear();
231 // Invoke netconf operations outside the sync block to avoid unnecessary blocking.
232 for (Consumer<ProxyNetconfServiceFacade> oper : operationsBatch) {
233 oper.accept(newNetconfFacade);