Bug 6714 - Use singleton service in clustered netconf topology
[netconf.git] / netconf / netconf-topology / src / test / java / org / opendaylight / netconf / topology / ActorTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology;
10
11 import static com.jayway.awaitility.Awaitility.await;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.when;
14
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSystem;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.japi.Creator;
21 import com.google.common.base.Function;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Preconditions;
24 import com.google.common.collect.Lists;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.FutureCallback;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.SettableFuture;
30 import com.typesafe.config.ConfigFactory;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.List;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.TimeUnit;
38 import javassist.ClassPool;
39 import javax.annotation.Nonnull;
40 import javax.annotation.Nullable;
41 import org.junit.Before;
42 import org.junit.Ignore;
43 import org.junit.Test;
44 import org.mockito.Mock;
45 import org.mockito.MockitoAnnotations;
46 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
47 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
48 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
49 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
50 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
51 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
52 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
53 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
54 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
55 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
56 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
57 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
58 import org.opendaylight.netconf.topology.example.ExampleNodeManagerCallback;
59 import org.opendaylight.netconf.topology.example.ExampleTopologyManagerCallback;
60 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
61 import org.opendaylight.netconf.topology.impl.NetconfNodeOperationalDataAggregator;
62 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
63 import org.opendaylight.netconf.topology.util.NoopRoleChangeStrategy;
64 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
65 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
66 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
67 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
68 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
71 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
72 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
73 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
74 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
75 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
76 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability;
77 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
80 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
81 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
82 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
83 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
84 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
85 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
86 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
87 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
88 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
89 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
90 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93
94 public class ActorTest {
95
96     private static final Logger LOG = LoggerFactory.getLogger(ActorTest.class);
97
98     private static final String TOPOLOGY_NETCONF = "TopologyNetconf";
99
100     @Mock
101     private EntityOwnershipService entityOwnershipService;
102
103     @Mock
104     private DataBroker dataBroker;
105
106     @Mock
107     private ReadOnlyTransaction mockedReadOnlyTx;
108
109     private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
110
111     static {
112         final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
113         moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
114         final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
115         Preconditions.checkState(schemaContextOptional.isPresent());
116         final SchemaContext topologySchemaCtx = schemaContextOptional.get();
117
118         final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
119         CODEC_REGISTRY = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
120         CODEC_REGISTRY.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
121     }
122
123     private static final String PATH_MASTER = "akka.tcp://NetconfNode@127.0.0.1:2552/user/TopologyNetconf";
124     private static final String PATH_SLAVE1 = "akka.tcp://NetconfNode@127.0.0.1:2553/user/TopologyNetconf";
125     private static final String PATH_SLAVE2 = "akka.tcp://NetconfNode@127.0.0.1:2554/user/TopologyNetconf";
126
127     private static final List<String> PATHS_MASTER = Lists.newArrayList(PATH_SLAVE1, PATH_SLAVE2);
128     private static final List<String> PATHS_SLAVE1 = Lists.newArrayList(PATH_MASTER, PATH_SLAVE2);
129     private static final List<String> PATHS_SLAVE2 = Lists.newArrayList(PATH_MASTER, PATH_SLAVE1);
130
131     private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node1"));
132     private static final ActorSystem ACTOR_SYSTEM_SLAVE1 = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node2"));
133     private static final ActorSystem ACTOR_SYSTEM_SLAVE2 = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node3"));
134
135     private static final ExecutorService callbackExecutor = Executors.newFixedThreadPool(8);
136
137     private TopologyManager master = null;
138
139     @Before
140     public void setup() {
141         MockitoAnnotations.initMocks(this);
142         final SettableFuture<Optional<Topology>> settableFuture = SettableFuture.create();
143         final CheckedFuture<Optional<Topology>, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
144             @Nullable
145             @Override
146             public ReadFailedException apply(Exception input) {
147                 return new ReadFailedException("Dummy future should never return this");
148             }
149         });
150         settableFuture.set(Optional.<Topology>absent());
151         when(mockedReadOnlyTx.read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class))).thenReturn(checkedFuture);
152         when(dataBroker.registerDataChangeListener(
153                 any(LogicalDatastoreType.class),
154                 any(InstanceIdentifier.class),
155                 any(DataChangeListener.class),
156                 any(DataChangeScope.class))).thenReturn(null);
157         when(dataBroker.newReadOnlyTransaction()).thenReturn(mockedReadOnlyTx);
158     }
159
160     private void setMaster(final TopologyManager manager) {
161
162     }
163
164     @Test
165     public void testRealActors() throws Exception {
166
167         EntityOwnershipService topoOwnership = new TestingEntityOwnershipService();
168         // load from config
169         final TopologyManager master = createManagerWithOwnership(ACTOR_SYSTEM, TOPOLOGY_NETCONF, true, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
170         Thread.sleep(1000);
171         final TopologyManager slave1 = createManagerWithOwnership(ACTOR_SYSTEM_SLAVE1, TOPOLOGY_NETCONF, false, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
172         final TopologyManager slave2 = createManagerWithOwnership(ACTOR_SYSTEM_SLAVE2, TOPOLOGY_NETCONF, false, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
173
174         await().atMost(30L, TimeUnit.SECONDS).until(new Callable<Boolean>() {
175             @Override
176             public Boolean call() throws Exception {
177                 return master.hasAllPeersUp();
178             }
179         });
180
181         final List<ListenableFuture<Node>> futures = new ArrayList<>();
182         for (int i = 0; i <= 1; i++) {
183             final String nodeid = "testing-node" + i;
184             final Node testingNode = new NodeBuilder()
185                     .setNodeId(new NodeId(nodeid))
186                     .addAugmentation(NetconfNode.class,
187                             new NetconfNodeBuilder()
188                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
189                                     .setPort(new PortNumber(10000 + i))
190                                     .build())
191                     .build();
192             final ListenableFuture<Node> nodeListenableFuture = master.onNodeCreated(new NodeId(nodeid), testingNode);
193             futures.add(nodeListenableFuture);
194             Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
195                 @Override
196                 public void onSuccess(Node result) {
197                     LOG.warn("Node {} created succesfully on all nodes", result.getNodeId().getValue());
198                 }
199
200                 @Override
201                 public void onFailure(Throwable t) {
202                     LOG.warn("Node creation failed. ", t);
203                 }
204             });
205         }
206
207         for (int i = 0; i <= 1; i++) {
208             final String nodeid = "testing-node" + i;
209             final Node testingNode = new NodeBuilder()
210                     .setNodeId(new NodeId(nodeid))
211                     .addAugmentation(NetconfNode.class,
212                             new NetconfNodeBuilder()
213                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
214                                     .setPort(new PortNumber(10000 + i))
215                                     .build())
216                     .build();
217             final ListenableFuture<Node> nodeListenableFuture = master.onNodeUpdated(new NodeId(nodeid), testingNode);
218             futures.add(nodeListenableFuture);
219             Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
220                 @Override
221                 public void onSuccess(Node result) {
222                     LOG.warn("Node {} updated succesfully on all nodes", result.getNodeId().getValue());
223                 }
224
225                 @Override
226                 public void onFailure(Throwable t) {
227                     LOG.warn("Node update failed. ", t);
228                 }
229             });
230         }
231         LOG.debug("Waiting for updates to finish");
232         Futures.allAsList(futures).get();
233
234
235         final List<ListenableFuture<Void>> deleteFutures = new ArrayList<>();
236         for (int i = 0; i <= 1; i++) {
237             final String nodeid = "testing-node" + i;
238             final ListenableFuture<Void> nodeListenableFuture = master.onNodeDeleted(new NodeId(nodeid));
239             deleteFutures.add(nodeListenableFuture);
240             Futures.addCallback(nodeListenableFuture, new FutureCallback<Void>() {
241                 @Override
242                 public void onSuccess(Void result) {
243                     LOG.warn("Node {} succesfully deleted on all nodes", nodeid);
244                 }
245
246                 @Override
247                 public void onFailure(Throwable t) {
248                     LOG.warn("Node delete failed. ", t);
249                 }
250             });
251
252         }
253         LOG.warn("All tasks submitted");
254         Futures.allAsList(futures).get();
255         Futures.allAsList(deleteFutures).get();
256
257         TypedActor.get(ACTOR_SYSTEM).stop(master);
258         TypedActor.get(ACTOR_SYSTEM_SLAVE1).stop(slave1);
259         TypedActor.get(ACTOR_SYSTEM_SLAVE2).stop(slave2);
260
261     }
262
263     // TODO seems like stopping actors is not enough to create an actor with same name, split this into multiple classes?
264     @Ignore
265     @Test
266     public void testWithDummyOwnershipService() throws Exception {
267
268         final TestingEntityOwnershipService ownershipService = new TestingEntityOwnershipService();
269         // load from config
270         final TopologyManager master = createNoopRoleChangeNode(ACTOR_SYSTEM, TOPOLOGY_NETCONF, true, createRealTopoCallbackFactory(ownershipService));
271         final TopologyManager slave1 = createNoopRoleChangeNode(ACTOR_SYSTEM_SLAVE1, TOPOLOGY_NETCONF, false, createRealTopoCallbackFactory(ownershipService));
272         final TopologyManager slave2 = createNoopRoleChangeNode(ACTOR_SYSTEM_SLAVE2, TOPOLOGY_NETCONF, false, createRealTopoCallbackFactory(ownershipService));
273
274         await().atMost(10L, TimeUnit.SECONDS).until(new Callable<Boolean>() {
275             @Override
276             public Boolean call() throws Exception {
277                 return master.hasAllPeersUp();
278             }
279         });
280
281         final List<ListenableFuture<Node>> futures = new ArrayList<>();
282         for (int i = 0; i <= 0; i++) {
283             final String nodeid = "testing-node" + i;
284             final Node testingNode = new NodeBuilder()
285                     .setNodeId(new NodeId(nodeid))
286                     .addAugmentation(NetconfNode.class,
287                             new NetconfNodeBuilder()
288                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
289                                     .setPort(new PortNumber(10000 + i))
290                                     .build())
291                     .build();
292             final ListenableFuture<Node> nodeListenableFuture = master.onNodeCreated(new NodeId(nodeid), testingNode);
293             futures.add(nodeListenableFuture);
294             Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
295                 @Override
296                 public void onSuccess(Node result) {
297                     LOG.warn("Node {} created succesfully on all nodes", result.getNodeId().getValue());
298                 }
299
300                 @Override
301                 public void onFailure(Throwable t) {
302                     LOG.warn("Node creation failed. ", t);
303                 }
304             });
305         }
306
307         Futures.allAsList(futures).get();
308         ownershipService.distributeOwnership();
309
310         Thread.sleep(30000);
311         TypedActor.get(ACTOR_SYSTEM).stop(master);
312         TypedActor.get(ACTOR_SYSTEM_SLAVE1).stop(slave1);
313         TypedActor.get(ACTOR_SYSTEM_SLAVE2).stop(slave2);
314     }
315
316     private TopologyManager createNoopRoleChangeNode(final ActorSystem actorSystem, final String topologyId, final boolean isMaster,
317                                                      final TopologyManagerCallbackFactory topologyManagerCallbackFactory) {
318
319         final TypedActorExtension typedActorExtension = TypedActor.get(actorSystem);
320         return typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
321             @Override
322             public BaseTopologyManager create() throws Exception {
323                 return new BaseTopologyManager(actorSystem,
324                         CODEC_REGISTRY,
325                         dataBroker,
326                         topologyId,
327                         topologyManagerCallbackFactory,
328                         new TestingSuccesfulStateAggregator(),
329                         new LoggingSalNodeWriter(),
330                         new NoopRoleChangeStrategy(),
331                         isMaster);
332             }
333         }), TOPOLOGY_NETCONF);
334     }
335
336     private TopologyManager createManagerWithOwnership(final ActorSystem actorSystem, final String topologyId, final boolean isMaster,
337                                                        final TopologyManagerCallbackFactory topologyManagerCallbackFactory, final RoleChangeStrategy roleChangeStrategy) {
338         final TypedActorExtension typedActorExtension = TypedActor.get(actorSystem);
339         return typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
340             @Override
341             public BaseTopologyManager create() throws Exception {
342                 return new BaseTopologyManager(actorSystem,
343                         CODEC_REGISTRY,
344                         dataBroker,
345                         topologyId,
346                         topologyManagerCallbackFactory,
347                         new NetconfNodeOperationalDataAggregator(),
348                         new LoggingSalNodeWriter(),
349                         roleChangeStrategy,
350                         isMaster);
351             }
352         }), TOPOLOGY_NETCONF);
353     }
354
355     private TopologyManagerCallbackFactory createRealTopoTestingNodeCallbackFactory() {
356         final NodeManagerCallbackFactory nodeManagerCallbackFactory = new NodeManagerCallbackFactory() {
357             @Override
358             public NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem) {
359                 return new LoggingNodeManagerCallback();
360             }
361         };
362
363         return new TopologyManagerCallbackFactory() {
364             @Override
365             public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
366                 return new ExampleTopologyManagerCallback(actorSystem, dataBroker, topologyId, nodeManagerCallbackFactory, new LoggingSalNodeWriter());
367             }
368         };
369     }
370
371     private TopologyManagerCallbackFactory createRealTopoCallbackFactory(final EntityOwnershipService entityOwnershipService) {
372         final NodeManagerCallbackFactory nodeManagerCallbackFactory = new NodeManagerCallbackFactory() {
373             @Override
374             public NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem) {
375                 return new ExampleNodeManagerCallback();
376             }
377         };
378
379         return new TopologyManagerCallbackFactory() {
380             @Override
381             public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
382                 return new ExampleTopologyManagerCallback(actorSystem, dataBroker, topologyId, nodeManagerCallbackFactory);
383             }
384         };
385     }
386
387     private TopologyManagerCallbackFactory createTestingTopoCallbackFactory() {
388         return new TopologyManagerCallbackFactory() {
389             @Override
390             public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
391                 return new TestingTopologyManagerCallback();
392             }
393         };
394     }
395
396     public static class LoggingNodeManagerCallback implements NodeManagerCallback {
397
398         @Nonnull
399         @Override
400         public Node getInitialState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
401             final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
402             return new NodeBuilder()
403                     .setNodeId(nodeId)
404                     .addAugmentation(NetconfNode.class,
405                             new NetconfNodeBuilder()
406                                     .setHost(netconfNode.getHost())
407                                     .setPort(netconfNode.getPort())
408                                     .setConnectionStatus(ConnectionStatus.Connecting)
409                                     .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
410                                     .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
411                                     .setClusteredConnectionStatus(
412                                             new ClusteredConnectionStatusBuilder()
413                                                     .setNodeStatus(
414                                                             Lists.newArrayList(
415                                                                     new NodeStatusBuilder()
416                                                                             .setNode("testing-node")
417                                                                             .setStatus(Status.Unavailable)
418                                                                             .build()))
419                                                     .build())
420                                     .build())
421                     .build();
422         }
423
424         @Nonnull
425         @Override
426         public Node getFailedState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
427             final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
428             return new NodeBuilder()
429                     .setNodeId(nodeId)
430                     .addAugmentation(NetconfNode.class,
431                             new NetconfNodeBuilder()
432                                     .setHost(netconfNode.getHost())
433                                     .setPort(netconfNode.getPort())
434                                     .setConnectionStatus(ConnectionStatus.UnableToConnect)
435                                     .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
436                                     .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
437                                     .setClusteredConnectionStatus(
438                                             new ClusteredConnectionStatusBuilder()
439                                                     .setNodeStatus(
440                                                             Collections.singletonList(
441                                                                     new NodeStatusBuilder()
442                                                                             .setNode("testing-node")
443                                                                             .setStatus(Status.Failed)
444                                                                             .build()))
445                                                     .build())
446                                     .build())
447                     .build();
448         }
449
450         @Nonnull
451         @Override
452         public ListenableFuture<Node> onNodeCreated(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
453             LOG.debug("Creating node {} with config {}", nodeId, configNode);
454             final NetconfNode augmentation = configNode.getAugmentation(NetconfNode.class);
455             return Futures.immediateFuture(new NodeBuilder()
456                     .setNodeId(nodeId)
457                     .addAugmentation(NetconfNode.class,
458                             new NetconfNodeBuilder()
459                                     .setConnectionStatus(ConnectionStatus.Connected)
460                                     .setHost(augmentation.getHost())
461                                     .setPort(augmentation.getPort())
462                                     .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
463                                     .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
464                                     .setClusteredConnectionStatus(
465                                             new ClusteredConnectionStatusBuilder()
466                                                     .setNodeStatus(
467                                                             Collections.singletonList(
468                                                                     new NodeStatusBuilder()
469                                                                             .setNode("testing-node")
470                                                                             .setStatus(Status.Connected)
471                                                                             .build()))
472                                                     .build())
473                                     .build())
474                     .build());
475         }
476
477         @Nonnull
478         @Override
479         public ListenableFuture<Node> onNodeUpdated(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
480             LOG.debug("Updating node {} with config {}", nodeId, configNode);
481             final NetconfNode augmentation = configNode.getAugmentation(NetconfNode.class);
482             return Futures.immediateFuture(new NodeBuilder()
483                     .setNodeId(nodeId)
484                     .addAugmentation(NetconfNode.class,
485                             new NetconfNodeBuilder()
486                                     .setConnectionStatus(ConnectionStatus.Connected)
487                                     .setHost(augmentation.getHost())
488                                     .setPort(augmentation.getPort())
489                                     .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
490                                     .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
491                                     .setClusteredConnectionStatus(
492                                             new ClusteredConnectionStatusBuilder()
493                                                     .setNodeStatus(
494                                                             Collections.singletonList(
495                                                                     new NodeStatusBuilder()
496                                                                             .setNode("testing-node")
497                                                                             .setStatus(Status.Connected)
498                                                                             .build()))
499                                                     .build())
500                                     .build())
501                     .build());
502         }
503
504         @Nonnull
505         @Override
506         public ListenableFuture<Void> onNodeDeleted(@Nonnull NodeId nodeId) {
507             LOG.debug("Deleting node {}", nodeId);
508             return Futures.immediateFuture(null);
509         }
510
511         @Nonnull
512         @Override
513         public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
514             return null;
515         }
516
517         @Override
518         public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
519
520         }
521
522         @Override
523         public void onReceive(Object o, ActorRef actorRef) {
524
525         }
526
527         @Override
528         public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
529
530         }
531
532         @Override
533         public void onDeviceDisconnected() {
534
535         }
536
537         @Override
538         public void onDeviceFailed(Throwable throwable) {
539
540         }
541
542         @Override
543         public void onNotification(DOMNotification domNotification) {
544
545         }
546
547         @Override
548         public void close() {
549
550         }
551     }
552
553     public static class TestingTopologyManagerCallback implements TopologyManagerCallback {
554
555         public TestingTopologyManagerCallback() {
556
557         }
558
559         @Override
560         public ListenableFuture<Node> onNodeCreated(NodeId nodeId, Node node) {
561             LOG.warn("Actor system that called this: {}", TypedActor.context().system().settings().toString());
562             return Futures.immediateFuture(new NodeBuilder()
563                     .setNodeId(nodeId)
564                     .addAugmentation(NetconfNode.class,
565                             new NetconfNodeBuilder()
566                                     .setConnectionStatus(ConnectionStatus.Connected)
567                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
568                                     .setPort(new PortNumber(2555))
569                                     .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
570                                     .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
571                                     .build())
572                     .build());
573         }
574
575         @Override
576         public ListenableFuture<Node> onNodeUpdated(NodeId nodeId, Node node) {
577             LOG.warn("Actor system that called this: {}", TypedActor.context().system().settings().toString());
578             LOG.debug("Update called on node {}, with config {}", nodeId.getValue(), node);
579             return Futures.immediateFuture(new NodeBuilder()
580                     .setNodeId(nodeId)
581                     .addAugmentation(NetconfNode.class,
582                             new NetconfNodeBuilder()
583                                     .setConnectionStatus(ConnectionStatus.Connected)
584                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
585                                     .setPort(new PortNumber(65535))
586                                     .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
587                                     .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
588                                     .build())
589                     .build());
590         }
591
592         @Override
593         public ListenableFuture<Void> onNodeDeleted(NodeId nodeId) {
594             LOG.debug("Delete called on node {}", nodeId.getValue());
595             return Futures.immediateFuture(null);
596         }
597
598         @Nonnull
599         @Override
600         public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
601             return null;
602         }
603
604         @Override
605         public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
606
607         }
608
609         @Override
610         public void onReceive(Object o, ActorRef actorRef) {
611
612         }
613
614         @Nonnull
615         @Override
616         public Node getInitialState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
617             return new NodeBuilder()
618                     .setNodeId(nodeId)
619                     .addAugmentation(NetconfNode.class,
620                             new NetconfNodeBuilder()
621                                     .setConnectionStatus(ConnectionStatus.Connecting)
622                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
623                                     .setPort(new PortNumber(65535))
624                                     .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
625                                     .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
626                                     .build())
627                     .build();
628         }
629
630         @Nonnull
631         @Override
632         public Node getFailedState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
633             return new NodeBuilder()
634                     .setNodeId(nodeId)
635                     .addAugmentation(NetconfNode.class,
636                             new NetconfNodeBuilder()
637                                     .setConnectionStatus(ConnectionStatus.UnableToConnect)
638                                     .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
639                                     .setPort(new PortNumber(65535))
640                                     .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
641                                     .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
642                                     .build())
643                     .build();
644         }
645     }
646
647     public class TestingSuccesfulStateAggregator implements StateAggregator {
648
649         @Override
650         public ListenableFuture<Node> combineCreateAttempts(List<ListenableFuture<Node>> stateFutures) {
651             final SettableFuture<Node> future = SettableFuture.create();
652             final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
653             Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
654                 @Override
655                 public void onSuccess(List<Node> result) {
656                     for (int i = 0; i < result.size() - 1; i++) {
657                         if (!result.get(i).equals(result.get(i + 1))) {
658                             LOG.warn("Node 1 {}: {}", result.get(i).getClass(), result.get(i));
659                             LOG.warn("Node 2 {}: {}", result.get(i + 1).getClass(), result.get(i + 1));
660                             future.setException(new IllegalStateException("Create futures have different result"));
661                             LOG.warn("Future1 : {}  Future2 : {}", result.get(i), result.get(i+1));
662                         }
663                     }
664                     future.set(result.get(0));
665                 }
666
667                 @Override
668                 public void onFailure(Throwable t) {
669                     LOG.error("One of the combined create attempts failed {}", t);
670                     future.setException(t);
671                 }
672             }, TypedActor.context().dispatcher());
673
674             return future;
675         }
676
677         @Override
678         public ListenableFuture<Node> combineUpdateAttempts(List<ListenableFuture<Node>> stateFutures) {
679             final SettableFuture<Node> future = SettableFuture.create();
680             final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
681             Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
682                 @Override
683                 public void onSuccess(List<Node> result) {
684                     for (int i = 0; i < result.size() - 1; i++) {
685                         if (!result.get(i).equals(result.get(i + 1))) {
686                             future.setException(new IllegalStateException("Update futures have different result"));
687                         }
688                     }
689                     future.set(result.get(0));
690                 }
691
692                 @Override
693                 public void onFailure(Throwable t) {
694                     LOG.error("One of the combined update attempts failed {}", t);
695                     future.setException(t);
696                 }
697             });
698             return future;
699         }
700
701         @Override
702         public ListenableFuture<Void> combineDeleteAttempts(List<ListenableFuture<Void>> stateFutures) {
703             final SettableFuture<Void> future = SettableFuture.create();
704             final ListenableFuture<List<Void>> allAsList = Futures.allAsList(stateFutures);
705             Futures.addCallback(allAsList, new FutureCallback<List<Void>>() {
706                 @Override
707                 public void onSuccess(List<Void> result) {
708                     future.set(null);
709                 }
710
711                 @Override
712                 public void onFailure(Throwable t) {
713                     LOG.error("One of the combined delete attempts failed {}", t);
714                     future.setException(t);
715                 }
716             });
717             return future;
718         }
719     }
720 }