Bump upstreams to SNAPSHOTs
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / netconf / ProxyNetconfService.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.netconf;
9
10 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
11 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
12
13 import akka.actor.ActorRef;
14 import akka.dispatch.OnComplete;
15 import akka.util.Timeout;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Optional;
22 import java.util.function.Consumer;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
27 import org.opendaylight.netconf.api.ModifyAction;
28 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
29 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.ExecutionContext;
35 import scala.concurrent.Future;
36
37 /**
38  * ProxyNetconfService uses provided {@link ActorRef} to delegate method calls to master
39  * {@link org.opendaylight.netconf.topology.singleton.impl.actors.NetconfDataTreeServiceActor}.
40  */
41 public class ProxyNetconfService implements NetconfDataTreeService {
42     private static final Logger LOG = LoggerFactory.getLogger(ProxyNetconfService.class);
43
44     private final RemoteDeviceId id;
45     @GuardedBy("queuedOperations")
46     private final List<Consumer<ProxyNetconfServiceFacade>> queuedOperations = new ArrayList<>();
47
48     private volatile ProxyNetconfServiceFacade netconfFacade;
49
50     public ProxyNetconfService(final RemoteDeviceId id, final Future<Object> masterActorFuture,
51                                final ExecutionContext executionContext, final Timeout askTimeout) {
52         this.id = id;
53         masterActorFuture.onComplete(new OnComplete<>() {
54             @Override
55             public void onComplete(final Throwable failure, final Object masterActor) {
56                 final ProxyNetconfServiceFacade newNetconfFacade;
57                 if (failure != null) {
58                     LOG.debug("{}: Failed to obtain master actor", id, failure);
59                     newNetconfFacade = new FailedProxyNetconfServiceFacade(id, failure);
60                 } else {
61                     LOG.debug("{}: Obtained master actor {}", id, masterActor);
62                     newNetconfFacade = new ActorProxyNetconfServiceFacade((ActorRef) masterActor, id,
63                         executionContext, askTimeout);
64                 }
65                 executePriorNetconfOperations(newNetconfFacade);
66             }
67         }, executionContext);
68     }
69
70     @Override
71     public ListenableFuture<DOMRpcResult> lock() {
72         LOG.debug("{}: Lock", id);
73         final SettableFuture<DOMRpcResult> future = SettableFuture.create();
74         processNetconfOperation(facade -> future.setFuture(facade.lock()));
75         return future;
76     }
77
78     @Override
79     public ListenableFuture<DOMRpcResult> unlock() {
80         LOG.debug("{}: Unlock", id);
81         final SettableFuture<DOMRpcResult> future = SettableFuture.create();
82         processNetconfOperation(facade -> future.setFuture(facade.unlock()));
83         return future;
84     }
85
86     @Override
87     public ListenableFuture<DOMRpcResult> discardChanges() {
88         LOG.debug("{}: Discard changes", id);
89         final SettableFuture<DOMRpcResult> future = SettableFuture.create();
90         processNetconfOperation(facade -> future.setFuture(facade.discardChanges()));
91         return future;
92     }
93
94     @Override
95     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path) {
96         LOG.debug("{}: Get {} {}", id, OPERATIONAL, path);
97         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
98         processNetconfOperation(facade -> returnFuture.setFuture(facade.get(path)));
99         return returnFuture;
100     }
101
102     @Override
103     public ListenableFuture<Optional<NormalizedNode>> get(final YangInstanceIdentifier path,
104                                                           final List<YangInstanceIdentifier> fields) {
105         LOG.debug("{}: Get {} {} with fields: {}", id, OPERATIONAL, path, fields);
106         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
107         processNetconfOperation(facade -> returnFuture.setFuture(facade.get(path, fields)));
108         return returnFuture;
109     }
110
111     @Override
112     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path) {
113         LOG.debug("{}: Get config {} {}", id, CONFIGURATION, path);
114         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
115         processNetconfOperation(facade -> returnFuture.setFuture(facade.getConfig(path)));
116         return returnFuture;
117     }
118
119     @Override
120     public ListenableFuture<Optional<NormalizedNode>> getConfig(final YangInstanceIdentifier path,
121                                                                 final List<YangInstanceIdentifier> fields) {
122         LOG.debug("{}: Get config {} {} with fields: {}", id, CONFIGURATION, path, fields);
123         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
124         processNetconfOperation(facade -> returnFuture.setFuture(facade.getConfig(path, fields)));
125         return returnFuture;
126     }
127
128     @Override
129     public ListenableFuture<? extends DOMRpcResult> merge(final LogicalDatastoreType store,
130             final YangInstanceIdentifier path, final NormalizedNode data,
131             final Optional<ModifyAction> defaultOperation) {
132         LOG.debug("{}: Merge {} {}", id, store, path);
133         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
134         processNetconfOperation(facade -> returnFuture.setFuture(facade.merge(store, path, data, defaultOperation)));
135         return returnFuture;
136     }
137
138     @Override
139     public ListenableFuture<? extends DOMRpcResult> replace(final LogicalDatastoreType store,
140             final YangInstanceIdentifier path, final NormalizedNode data,
141             final Optional<ModifyAction> defaultOperation) {
142         LOG.debug("{}: Replace {} {}", id, store, path);
143         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
144         processNetconfOperation(facade -> returnFuture.setFuture(facade.replace(store, path, data, defaultOperation)));
145         return returnFuture;
146     }
147
148     @Override
149     public ListenableFuture<? extends DOMRpcResult> create(final LogicalDatastoreType store,
150             final YangInstanceIdentifier path, final NormalizedNode data,
151             final Optional<ModifyAction> defaultOperation) {
152         LOG.debug("{}: Create {} {}", id, store, path);
153         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
154         processNetconfOperation(facade -> returnFuture.setFuture(facade.create(store, path, data, defaultOperation)));
155         return returnFuture;
156     }
157
158     @Override
159     public ListenableFuture<? extends DOMRpcResult> delete(final LogicalDatastoreType store,
160             final YangInstanceIdentifier path) {
161         LOG.debug("{}: Delete {} {}", id, store, path);
162         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
163         processNetconfOperation(facade -> returnFuture.setFuture(facade.delete(store, path)));
164         return returnFuture;
165     }
166
167     @Override
168     public ListenableFuture<? extends DOMRpcResult> remove(final LogicalDatastoreType store,
169             final YangInstanceIdentifier path) {
170         LOG.debug("{}: Remove {} {}", id, store, path);
171         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
172         processNetconfOperation(facade -> returnFuture.setFuture(facade.remove(store, path)));
173         return returnFuture;
174     }
175
176     @Override
177     public ListenableFuture<? extends DOMRpcResult> commit() {
178         LOG.debug("{}: Commit", id);
179         final SettableFuture<DOMRpcResult> returnFuture = SettableFuture.create();
180         processNetconfOperation(facade -> returnFuture.setFuture(facade.commit()));
181         return returnFuture;
182     }
183
184     @Override
185     public @NonNull Object getDeviceId() {
186         return id;
187     }
188
189     private void processNetconfOperation(final Consumer<ProxyNetconfServiceFacade> operation) {
190         final ProxyNetconfServiceFacade facadeOnEntry;
191         synchronized (queuedOperations) {
192             if (netconfFacade == null) {
193                 LOG.debug("{}: Queuing netconf operation", id);
194
195                 queuedOperations.add(operation);
196                 facadeOnEntry = null;
197             } else {
198                 facadeOnEntry = netconfFacade;
199             }
200         }
201
202         if (facadeOnEntry != null) {
203             operation.accept(facadeOnEntry);
204         }
205     }
206
207     private void executePriorNetconfOperations(final ProxyNetconfServiceFacade newNetconfFacade) {
208         while (true) {
209             // Access to queuedOperations and netconfFacade must be protected and atomic
210             // (ie synchronized) with respect to #processNetconfOperation to handle timing
211             // issues and ensure no ProxyNetconfServiceFacade is missed and that they are processed
212             // in the order they occurred.
213
214             // We'll make a local copy of the queuedOperations list to handle re-entrancy
215             // in case a netconf operation results in another netconf operation being
216             // queued (eg a put operation from a client read Future callback that is notified
217             // synchronously).
218             final Collection<Consumer<ProxyNetconfServiceFacade>> operationsBatch;
219             synchronized (queuedOperations) {
220                 if (queuedOperations.isEmpty()) {
221                     // We're done invoking the netconf operations so we can now publish the
222                     // ProxyNetconfServiceFacade.
223                     netconfFacade = newNetconfFacade;
224                     break;
225                 }
226
227                 operationsBatch = new ArrayList<>(queuedOperations);
228                 queuedOperations.clear();
229             }
230
231             // Invoke netconf operations outside the sync block to avoid unnecessary blocking.
232             for (Consumer<ProxyNetconfServiceFacade> oper : operationsBatch) {
233                 oper.accept(newNetconfFacade);
234             }
235         }
236     }
237 }