2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology;
11 import static com.jayway.awaitility.Awaitility.await;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.when;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSystem;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.japi.Creator;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.collect.Lists;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.SettableFuture;
28 import com.typesafe.config.ConfigFactory;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.TimeUnit;
36 import javassist.ClassPool;
37 import javax.annotation.Nonnull;
38 import org.junit.Before;
39 import org.junit.Ignore;
40 import org.junit.Test;
41 import org.mockito.Mock;
42 import org.mockito.MockitoAnnotations;
43 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
44 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
45 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
46 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
49 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
50 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
51 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
53 import org.opendaylight.netconf.topology.example.ExampleNodeManagerCallback;
54 import org.opendaylight.netconf.topology.example.ExampleTopologyManagerCallback;
55 import org.opendaylight.netconf.topology.example.LoggingSalNodeWriter;
56 import org.opendaylight.netconf.topology.impl.NetconfNodeOperationalDataAggregator;
57 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
58 import org.opendaylight.netconf.topology.util.NoopRoleChangeStrategy;
59 import org.opendaylight.netconf.topology.util.TopologyRoleChangeStrategy;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Host;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
63 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.$YangModuleInfoImpl;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
74 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
75 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
76 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
77 import org.opendaylight.yangtools.sal.binding.generator.util.BindingRuntimeContext;
78 import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
79 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
80 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
81 import org.slf4j.Logger;
82 import org.slf4j.LoggerFactory;
84 public class ActorTest {
86 private static final Logger LOG = LoggerFactory.getLogger(ActorTest.class);
88 private static final String TOPOLOGY_NETCONF = "TopologyNetconf";
91 private EntityOwnershipService entityOwnershipService;
94 private DataBroker dataBroker;
96 private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
99 final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
100 moduleInfoBackedContext.addModuleInfos(Collections.singletonList($YangModuleInfoImpl.getInstance()));
101 final Optional<SchemaContext> schemaContextOptional = moduleInfoBackedContext.tryToCreateSchemaContext();
102 Preconditions.checkState(schemaContextOptional.isPresent());
103 final SchemaContext topologySchemaCtx = schemaContextOptional.get();
105 final JavassistUtils javassist = JavassistUtils.forClassPool(ClassPool.getDefault());
106 CODEC_REGISTRY = new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(javassist));
107 CODEC_REGISTRY.onBindingRuntimeContextUpdated(BindingRuntimeContext.create(moduleInfoBackedContext, topologySchemaCtx));
110 private static final String PATH_MASTER = "akka.tcp://NetconfNode@127.0.0.1:2552/user/TopologyNetconf";
111 private static final String PATH_SLAVE1 = "akka.tcp://NetconfNode@127.0.0.1:2553/user/TopologyNetconf";
112 private static final String PATH_SLAVE2 = "akka.tcp://NetconfNode@127.0.0.1:2554/user/TopologyNetconf";
114 private static final List<String> PATHS_MASTER = Lists.newArrayList(PATH_SLAVE1, PATH_SLAVE2);
115 private static final List<String> PATHS_SLAVE1 = Lists.newArrayList(PATH_MASTER, PATH_SLAVE2);
116 private static final List<String> PATHS_SLAVE2 = Lists.newArrayList(PATH_MASTER, PATH_SLAVE1);
118 private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node1"));
119 private static final ActorSystem ACTOR_SYSTEM_SLAVE1 = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node2"));
120 private static final ActorSystem ACTOR_SYSTEM_SLAVE2 = ActorSystem.create("NetconfNode", ConfigFactory.load("netconf-node3"));
122 private static final ExecutorService callbackExecutor = Executors.newFixedThreadPool(8);
124 private TopologyManager master = null;
127 public void setup() {
128 MockitoAnnotations.initMocks(this);
129 when(dataBroker.registerDataChangeListener(
130 any(LogicalDatastoreType.class),
131 any(InstanceIdentifier.class),
132 any(DataChangeListener.class),
133 any(DataChangeScope.class))).thenReturn(null);
136 private void setMaster(final TopologyManager manager) {
141 public void testRealActors() throws Exception {
143 EntityOwnershipService topoOwnership = new TestingEntityOwnershipService();
145 final TopologyManager master = createManagerWithOwnership(ACTOR_SYSTEM, TOPOLOGY_NETCONF, true, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
147 final TopologyManager slave1 = createManagerWithOwnership(ACTOR_SYSTEM_SLAVE1, TOPOLOGY_NETCONF, false, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
148 final TopologyManager slave2 = createManagerWithOwnership(ACTOR_SYSTEM_SLAVE2, TOPOLOGY_NETCONF, false, createRealTopoTestingNodeCallbackFactory(), new TopologyRoleChangeStrategy(dataBroker, topoOwnership, TOPOLOGY_NETCONF, "topology-manager"));
150 await().atMost(30L, TimeUnit.SECONDS).until(new Callable<Boolean>() {
152 public Boolean call() throws Exception {
153 return master.hasAllPeersUp();
157 final List<ListenableFuture<Node>> futures = new ArrayList<>();
158 for (int i = 0; i <= 1; i++) {
159 final String nodeid = "testing-node" + i;
160 final Node testingNode = new NodeBuilder()
161 .setNodeId(new NodeId(nodeid))
162 .addAugmentation(NetconfNode.class,
163 new NetconfNodeBuilder()
164 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
165 .setPort(new PortNumber(10000 + i))
168 final ListenableFuture<Node> nodeListenableFuture = master.onNodeCreated(new NodeId(nodeid), testingNode);
169 futures.add(nodeListenableFuture);
170 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
172 public void onSuccess(Node result) {
173 LOG.warn("Node {} created succesfully on all nodes", result.getNodeId().getValue());
177 public void onFailure(Throwable t) {
178 LOG.warn("Node creation failed. ", t);
183 for (int i = 0; i <= 1; i++) {
184 final String nodeid = "testing-node" + i;
185 final Node testingNode = new NodeBuilder()
186 .setNodeId(new NodeId(nodeid))
187 .addAugmentation(NetconfNode.class,
188 new NetconfNodeBuilder()
189 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
190 .setPort(new PortNumber(10000 + i))
193 final ListenableFuture<Node> nodeListenableFuture = master.onNodeUpdated(new NodeId(nodeid), testingNode);
194 futures.add(nodeListenableFuture);
195 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
197 public void onSuccess(Node result) {
198 LOG.warn("Node {} updated succesfully on all nodes", result.getNodeId().getValue());
202 public void onFailure(Throwable t) {
203 LOG.warn("Node update failed. ", t);
209 final List<ListenableFuture<Void>> deleteFutures = new ArrayList<>();
210 for (int i = 0; i <= 1; i++) {
211 final String nodeid = "testing-node" + i;
212 final ListenableFuture<Void> nodeListenableFuture = master.onNodeDeleted(new NodeId(nodeid));
213 deleteFutures.add(nodeListenableFuture);
214 Futures.addCallback(nodeListenableFuture, new FutureCallback<Void>() {
216 public void onSuccess(Void result) {
217 LOG.warn("Node {} succesfully deleted on all nodes", nodeid);
221 public void onFailure(Throwable t) {
222 LOG.warn("Node delete failed. ", t);
227 LOG.warn("All tasks submitted");
228 Futures.allAsList(futures).get();
229 Futures.allAsList(deleteFutures).get();
231 TypedActor.get(ACTOR_SYSTEM).stop(master);
232 TypedActor.get(ACTOR_SYSTEM_SLAVE1).stop(slave1);
233 TypedActor.get(ACTOR_SYSTEM_SLAVE2).stop(slave2);
237 // TODO seems like stopping actors is not enough to create an actor with same name, split this into multiple classes?
240 public void testWithDummyOwnershipService() throws Exception {
242 final TestingEntityOwnershipService ownershipService = new TestingEntityOwnershipService();
244 final TopologyManager master = createNoopRoleChangeNode(ACTOR_SYSTEM, TOPOLOGY_NETCONF, true, createRealTopoCallbackFactory(ownershipService));
245 final TopologyManager slave1 = createNoopRoleChangeNode(ACTOR_SYSTEM_SLAVE1, TOPOLOGY_NETCONF, false, createRealTopoCallbackFactory(ownershipService));
246 final TopologyManager slave2 = createNoopRoleChangeNode(ACTOR_SYSTEM_SLAVE2, TOPOLOGY_NETCONF, false, createRealTopoCallbackFactory(ownershipService));
248 await().atMost(10L, TimeUnit.SECONDS).until(new Callable<Boolean>() {
250 public Boolean call() throws Exception {
251 return master.hasAllPeersUp();
255 final List<ListenableFuture<Node>> futures = new ArrayList<>();
256 for (int i = 0; i <= 0; i++) {
257 final String nodeid = "testing-node" + i;
258 final Node testingNode = new NodeBuilder()
259 .setNodeId(new NodeId(nodeid))
260 .addAugmentation(NetconfNode.class,
261 new NetconfNodeBuilder()
262 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
263 .setPort(new PortNumber(10000 + i))
266 final ListenableFuture<Node> nodeListenableFuture = master.onNodeCreated(new NodeId(nodeid), testingNode);
267 futures.add(nodeListenableFuture);
268 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
270 public void onSuccess(Node result) {
271 LOG.warn("Node {} created succesfully on all nodes", result.getNodeId().getValue());
275 public void onFailure(Throwable t) {
276 LOG.warn("Node creation failed. ", t);
281 Futures.allAsList(futures).get();
282 ownershipService.distributeOwnership();
285 TypedActor.get(ACTOR_SYSTEM).stop(master);
286 TypedActor.get(ACTOR_SYSTEM_SLAVE1).stop(slave1);
287 TypedActor.get(ACTOR_SYSTEM_SLAVE2).stop(slave2);
290 private TopologyManager createNoopRoleChangeNode(final ActorSystem actorSystem, final String topologyId, final boolean isMaster,
291 final TopologyManagerCallbackFactory topologyManagerCallbackFactory) {
293 final TypedActorExtension typedActorExtension = TypedActor.get(actorSystem);
294 return typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
296 public BaseTopologyManager create() throws Exception {
297 return new BaseTopologyManager(actorSystem,
301 topologyManagerCallbackFactory,
302 new TestingSuccesfulStateAggregator(),
303 new LoggingSalNodeWriter(),
304 new NoopRoleChangeStrategy(),
307 }), TOPOLOGY_NETCONF);
310 private TopologyManager createManagerWithOwnership(final ActorSystem actorSystem, final String topologyId, final boolean isMaster,
311 final TopologyManagerCallbackFactory topologyManagerCallbackFactory, final RoleChangeStrategy roleChangeStrategy) {
312 final TypedActorExtension typedActorExtension = TypedActor.get(actorSystem);
313 return typedActorExtension.typedActorOf(new TypedProps<>(TopologyManager.class, new Creator<BaseTopologyManager>() {
315 public BaseTopologyManager create() throws Exception {
316 return new BaseTopologyManager(actorSystem,
320 topologyManagerCallbackFactory,
321 new NetconfNodeOperationalDataAggregator(),
322 new LoggingSalNodeWriter(),
326 }), TOPOLOGY_NETCONF);
329 private TopologyManagerCallbackFactory createRealTopoTestingNodeCallbackFactory() {
330 final NodeManagerCallbackFactory nodeManagerCallbackFactory = new NodeManagerCallbackFactory() {
332 public NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem) {
333 return new LoggingNodeManagerCallback();
337 return new TopologyManagerCallbackFactory() {
339 public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
340 return new ExampleTopologyManagerCallback(actorSystem, dataBroker, topologyId, nodeManagerCallbackFactory, new LoggingSalNodeWriter());
345 private TopologyManagerCallbackFactory createRealTopoCallbackFactory(final EntityOwnershipService entityOwnershipService) {
346 final NodeManagerCallbackFactory nodeManagerCallbackFactory = new NodeManagerCallbackFactory() {
348 public NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem) {
349 return new ExampleNodeManagerCallback();
353 return new TopologyManagerCallbackFactory() {
355 public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
356 return new ExampleTopologyManagerCallback(actorSystem, dataBroker, topologyId, nodeManagerCallbackFactory);
361 private TopologyManagerCallbackFactory createTestingTopoCallbackFactory() {
362 return new TopologyManagerCallbackFactory() {
364 public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
365 return new TestingTopologyManagerCallback();
370 public static class LoggingNodeManagerCallback implements NodeManagerCallback {
374 public Node getInitialState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
375 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
376 return new NodeBuilder()
378 .addAugmentation(NetconfNode.class,
379 new NetconfNodeBuilder()
380 .setHost(netconfNode.getHost())
381 .setPort(netconfNode.getPort())
382 .setConnectionStatus(ConnectionStatus.Connecting)
383 .setClusteredConnectionStatus(
384 new ClusteredConnectionStatusBuilder()
387 new NodeStatusBuilder()
388 .setNode("testing-node")
389 .setStatus(Status.Unavailable)
398 public Node getFailedState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
399 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
400 return new NodeBuilder()
402 .addAugmentation(NetconfNode.class,
403 new NetconfNodeBuilder()
404 .setHost(netconfNode.getHost())
405 .setPort(netconfNode.getPort())
406 .setConnectionStatus(ConnectionStatus.UnableToConnect)
407 .setClusteredConnectionStatus(
408 new ClusteredConnectionStatusBuilder()
410 Collections.singletonList(
411 new NodeStatusBuilder()
412 .setNode("testing-node")
413 .setStatus(Status.Failed)
422 public ListenableFuture<Node> onNodeCreated(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
423 LOG.debug("Creating node {} with config {}", nodeId, configNode);
424 final NetconfNode augmentation = configNode.getAugmentation(NetconfNode.class);
425 return Futures.immediateFuture(new NodeBuilder()
427 .addAugmentation(NetconfNode.class,
428 new NetconfNodeBuilder()
429 .setConnectionStatus(ConnectionStatus.Connected)
430 .setHost(augmentation.getHost())
431 .setPort(augmentation.getPort())
432 .setClusteredConnectionStatus(
433 new ClusteredConnectionStatusBuilder()
435 Collections.singletonList(
436 new NodeStatusBuilder()
437 .setNode("testing-node")
438 .setStatus(Status.Connected)
447 public ListenableFuture<Node> onNodeUpdated(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
448 LOG.debug("Updating node {} with config {}", nodeId, configNode);
449 final NetconfNode augmentation = configNode.getAugmentation(NetconfNode.class);
450 return Futures.immediateFuture(new NodeBuilder()
452 .addAugmentation(NetconfNode.class,
453 new NetconfNodeBuilder()
454 .setConnectionStatus(ConnectionStatus.Connected)
455 .setHost(augmentation.getHost())
456 .setPort(augmentation.getPort())
457 .setClusteredConnectionStatus(
458 new ClusteredConnectionStatusBuilder()
460 Collections.singletonList(
461 new NodeStatusBuilder()
462 .setNode("testing-node")
463 .setStatus(Status.Connected)
472 public ListenableFuture<Void> onNodeDeleted(@Nonnull NodeId nodeId) {
473 LOG.debug("Deleting node {}", nodeId);
474 return Futures.immediateFuture(null);
479 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
484 public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
489 public void onReceive(Object o, ActorRef actorRef) {
494 public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
499 public void onDeviceDisconnected() {
504 public void onDeviceFailed(Throwable throwable) {
509 public void onNotification(DOMNotification domNotification) {
514 public void close() {
519 public static class TestingTopologyManagerCallback implements TopologyManagerCallback {
521 public TestingTopologyManagerCallback() {
526 public ListenableFuture<Node> onNodeCreated(NodeId nodeId, Node node) {
527 LOG.warn("Actor system that called this: {}", TypedActor.context().system().settings().toString());
528 return Futures.immediateFuture(new NodeBuilder()
530 .addAugmentation(NetconfNode.class,
531 new NetconfNodeBuilder()
532 .setConnectionStatus(ConnectionStatus.Connected)
533 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
534 .setPort(new PortNumber(2555))
540 public ListenableFuture<Node> onNodeUpdated(NodeId nodeId, Node node) {
541 LOG.warn("Actor system that called this: {}", TypedActor.context().system().settings().toString());
542 LOG.debug("Update called on node {}, with config {}", nodeId.getValue(), node);
543 return Futures.immediateFuture(new NodeBuilder()
545 .addAugmentation(NetconfNode.class,
546 new NetconfNodeBuilder()
547 .setConnectionStatus(ConnectionStatus.Connected)
548 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
549 .setPort(new PortNumber(65535))
555 public ListenableFuture<Void> onNodeDeleted(NodeId nodeId) {
556 LOG.debug("Delete called on node {}", nodeId.getValue());
557 return Futures.immediateFuture(null);
562 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
567 public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
572 public void onReceive(Object o, ActorRef actorRef) {
578 public Node getInitialState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
579 return new NodeBuilder()
581 .addAugmentation(NetconfNode.class,
582 new NetconfNodeBuilder()
583 .setConnectionStatus(ConnectionStatus.Connecting)
584 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
585 .setPort(new PortNumber(65535))
592 public Node getFailedState(@Nonnull NodeId nodeId, @Nonnull Node configNode) {
593 return new NodeBuilder()
595 .addAugmentation(NetconfNode.class,
596 new NetconfNodeBuilder()
597 .setConnectionStatus(ConnectionStatus.UnableToConnect)
598 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
599 .setPort(new PortNumber(65535))
605 public class TestingSuccesfulStateAggregator implements StateAggregator {
608 public ListenableFuture<Node> combineCreateAttempts(List<ListenableFuture<Node>> stateFutures) {
609 final SettableFuture<Node> future = SettableFuture.create();
610 final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
611 Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
613 public void onSuccess(List<Node> result) {
614 for (int i = 0; i < result.size() - 1; i++) {
615 if (!result.get(i).equals(result.get(i + 1))) {
616 LOG.warn("Node 1 {}: {}", result.get(i).getClass(), result.get(i));
617 LOG.warn("Node 2 {}: {}", result.get(i + 1).getClass(), result.get(i + 1));
618 future.setException(new IllegalStateException("Create futures have different result"));
619 LOG.warn("Future1 : {} Future2 : {}", result.get(i), result.get(i+1));
622 future.set(result.get(0));
626 public void onFailure(Throwable t) {
627 LOG.error("One of the combined create attempts failed {}", t);
628 future.setException(t);
630 }, TypedActor.context().dispatcher());
636 public ListenableFuture<Node> combineUpdateAttempts(List<ListenableFuture<Node>> stateFutures) {
637 final SettableFuture<Node> future = SettableFuture.create();
638 final ListenableFuture<List<Node>> allAsList = Futures.allAsList(stateFutures);
639 Futures.addCallback(allAsList, new FutureCallback<List<Node>>() {
641 public void onSuccess(List<Node> result) {
642 for (int i = 0; i < result.size() - 1; i++) {
643 if (!result.get(i).equals(result.get(i + 1))) {
644 future.setException(new IllegalStateException("Update futures have different result"));
647 future.set(result.get(0));
651 public void onFailure(Throwable t) {
652 LOG.error("One of the combined update attempts failed {}", t);
653 future.setException(t);
660 public ListenableFuture<Void> combineDeleteAttempts(List<ListenableFuture<Void>> stateFutures) {
661 final SettableFuture<Void> future = SettableFuture.create();
662 final ListenableFuture<List<Void>> allAsList = Futures.allAsList(stateFutures);
663 Futures.addCallback(allAsList, new FutureCallback<List<Void>>() {
665 public void onSuccess(List<Void> result) {
670 public void onFailure(Throwable t) {
671 LOG.error("One of the combined delete attempts failed {}", t);
672 future.setException(t);