2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.util;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
40 import java.util.Map.Entry;
41 import java.util.Random;
43 import java.util.concurrent.TimeUnit;
44 import javax.annotation.Nonnull;
45 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
46 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.netconf.topology.RoleChangeStrategy;
50 import org.opendaylight.netconf.topology.StateAggregator;
51 import org.opendaylight.netconf.topology.TopologyManager;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback;
53 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
54 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
55 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
56 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
61 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
62 import org.opendaylight.yangtools.yang.binding.DataObject;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71 import scala.concurrent.impl.Promise.DefaultPromise;
73 public final class BaseTopologyManager
74 implements TopologyManager {
76 private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
78 private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
80 private final ActorSystem system;
81 private final TypedActorExtension typedExtension;
82 private final Cluster clusterExtension;
84 private final BindingNormalizedNodeCodecRegistry codecRegistry;
86 private static final String PATH = "/user/";
88 private final DataBroker dataBroker;
89 private final RoleChangeStrategy roleChangeStrategy;
90 private final StateAggregator aggregator;
92 private final NodeWriter naSalNodeWriter;
93 private final String topologyId;
94 private final TopologyManagerCallback delegateTopologyHandler;
95 private final Set<NodeId> created = new HashSet<>();
97 private final Map<Address, TopologyManager> peers = new HashMap<>();
98 private final int id = new Random().nextInt();
100 private boolean isMaster;
102 public BaseTopologyManager(final ActorSystem system,
103 final BindingNormalizedNodeCodecRegistry codecRegistry,
104 final DataBroker dataBroker,
105 final String topologyId,
106 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
107 final StateAggregator aggregator,
108 final NodeWriter naSalNodeWriter,
109 final RoleChangeStrategy roleChangeStrategy) {
110 this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
113 public BaseTopologyManager(final ActorSystem system,
114 final BindingNormalizedNodeCodecRegistry codecRegistry,
115 final DataBroker dataBroker,
116 final String topologyId,
117 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
118 final StateAggregator aggregator,
119 final NodeWriter naSalNodeWriter,
120 final RoleChangeStrategy roleChangeStrategy,
121 final boolean isMaster) {
123 this.system = system;
124 this.typedExtension = TypedActor.get(system);
125 this.clusterExtension = Cluster.get(system);
126 this.dataBroker = dataBroker;
127 this.topologyId = topologyId;
128 this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
129 this.aggregator = aggregator;
130 this.naSalNodeWriter = naSalNodeWriter;
131 this.roleChangeStrategy = roleChangeStrategy;
132 this.codecRegistry = codecRegistry;
134 // election has not yet happened
135 this.isMaster = isMaster;
137 this.topologyListPath = TopologyUtil.createTopologyListPath(topologyId);
139 LOG.debug("Base manager started ", +id);
143 public void preStart() {
144 LOG.debug("preStart called");
145 // TODO change to enum, master/slave active/standby
146 roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
147 LOG.debug("candidate registered");
148 clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
152 public void postStop() {
153 LOG.debug("postStop called");
154 clusterExtension.leave(clusterExtension.selfAddress());
155 clusterExtension.unsubscribe(TypedActor.context().self());
159 public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
160 LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
162 if (created.contains(nodeId)) {
163 LOG.warn("Node{} already exists, triggering update..", nodeId);
164 return onNodeUpdated(nodeId, node);
167 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
171 futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
172 // only master should call connect on peers and aggregate futures
173 for (TopologyManager topologyManager : peers.values()) {
174 // convert binding into NormalizedNode for transfer
175 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
177 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
178 LOG.debug("Value {}", normalizedNodeEntry.getValue());
180 // add a future into our futures that gets its completion status from the converted scala future
181 final SettableFuture<Node> settableFuture = SettableFuture.create();
182 futures.add(settableFuture);
183 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
184 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
186 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
187 if (failure != null) {
188 settableFuture.setException(failure);
191 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
192 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
193 final Node value = (Node) fromNormalizedNode.getValue();
195 settableFuture.set(value);
197 }, TypedActor.context().dispatcher());
200 final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
201 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
203 public void onSuccess(final Node result) {
204 LOG.debug("Futures aggregated succesfully");
205 naSalNodeWriter.init(nodeId, result);
209 public void onFailure(final Throwable t) {
210 // If the combined connection attempt failed, set the node to connection failed
211 LOG.debug("Futures aggregation failed");
212 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
214 }, TypedActor.context().dispatcher());
216 //combine peer futures
217 return aggregatedFuture;
220 // trigger create on this slave
221 return delegateTopologyHandler.onNodeCreated(nodeId, node);
225 public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
226 LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
228 // Master needs to trigger onNodeUpdated on peers and combine results
230 // first cleanup old node
231 final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
232 final SettableFuture<Node> createFuture = SettableFuture.create();
233 final TopologyManager selfProxy = TypedActor.self();
234 final ActorContext context = TypedActor.context();
235 Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
237 public void onSuccess(Void result) {
238 LOG.warn("Delete part of update succesfull, triggering create");
239 // trigger create on all nodes
240 Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
242 public void onSuccess(Node result) {
243 createFuture.set(result);
247 public void onFailure(Throwable t) {
248 createFuture.setException(t);
250 }, context.dispatcher());
254 public void onFailure(Throwable t) {
255 LOG.warn("Delete part of update failed, {}", t);
257 }, context.dispatcher());
261 // Trigger update on this slave
262 return delegateTopologyHandler.onNodeUpdated(nodeId, node);
266 public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
267 final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
268 created.remove(nodeId);
270 // Master needs to trigger delete on peers and combine results
272 futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
273 for (TopologyManager topologyManager : peers.values()) {
274 // add a future into our futures that gets its completion status from the converted scala future
275 final SettableFuture<Void> settableFuture = SettableFuture.create();
276 futures.add(settableFuture);
277 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
278 scalaFuture.onComplete(new OnComplete<Void>() {
280 public void onComplete(Throwable failure, Void success) throws Throwable {
281 if (failure != null) {
282 settableFuture.setException(failure);
286 settableFuture.set(success);
288 }, TypedActor.context().dispatcher());
291 final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
292 Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
294 public void onSuccess(final Void result) {
295 naSalNodeWriter.delete(nodeId);
299 public void onFailure(final Throwable t) {
304 return aggregatedFuture;
308 return delegateTopologyHandler.onNodeDeleted(nodeId);
313 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
314 return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
318 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
319 isMaster = roleChangeDTO.isOwner();
320 delegateTopologyHandler.onRoleChanged(roleChangeDTO);
322 LOG.debug("Node {} is master now", clusterExtension.selfAddress());
323 clusterExtension.join(clusterExtension.selfAddress());
328 public Future<Boolean> isMaster() {
329 return new DefaultPromise<Boolean>().success(isMaster).future();
333 public void notifyNodeStatusChange(final NodeId nodeId) {
334 LOG.debug("Connection status has changed on node {}", nodeId.getValue());
336 // grab status from all peers and aggregate
337 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
338 futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
339 // only master should call connect on peers and aggregate futures
340 for (TopologyManager topologyManager : peers.values()) {
341 // add a future into our futures that gets its completion status from the converted scala future
342 final SettableFuture<Node> settableFuture = SettableFuture.create();
343 futures.add(settableFuture);
344 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
345 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
347 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
348 if (failure != null) {
349 settableFuture.setException(failure);
352 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
353 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
354 final Node value = (Node) fromNormalizedNode.getValue();
356 settableFuture.set(value);
358 }, TypedActor.context().dispatcher());
361 final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
362 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
364 public void onSuccess(final Node result) {
365 LOG.debug("Futures aggregated succesfully");
366 naSalNodeWriter.update(nodeId, result);
370 public void onFailure(final Throwable t) {
371 // If the combined connection attempt failed, set the node to connection failed
372 LOG.debug("Futures aggregation failed");
373 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
378 LOG.debug("Not master, forwarding..");
379 for (final TopologyManager manager : peers.values()) {
380 // asynchronously find out which peer is master
381 final Future<Boolean> future = manager.isMaster();
382 future.onComplete(new OnComplete<Boolean>() {
384 public void onComplete(Throwable failure, Boolean success) throws Throwable {
385 if (failure == null && success) {
386 LOG.debug("Found master peer");
388 manager.notifyNodeStatusChange(nodeId);
391 if (failure != null) {
392 LOG.debug("Retrieving master peer failed, {}", failure);
395 }, TypedActor.context().dispatcher());
400 public boolean hasAllPeersUp() {
401 LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
402 LOG.warn(clusterExtension.state().toString());
403 LOG.warn(peers.toString());
404 return peers.size() == 2;
408 public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
409 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
410 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
411 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
412 final Node value = (Node) fromNormalizedNode.getValue();
414 LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
415 final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
416 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
417 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
419 public void onSuccess(Node result) {
420 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
421 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
425 public void onFailure(Throwable t) {
430 return promise.future();
434 public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
435 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
436 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
437 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
438 final Node value = (Node) fromNormalizedNode.getValue();
440 LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
442 final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
443 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
444 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
446 public void onSuccess(Node result) {
447 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
448 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
452 public void onFailure(Throwable t) {
456 return promise.future();
460 public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
461 LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
463 final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
464 final DefaultPromise<Void> promise = new DefaultPromise<>();
465 Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
467 public void onSuccess(Void result) {
468 promise.success(null);
472 public void onFailure(Throwable t) {
477 return promise.future();
480 public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
481 LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
483 final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
484 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
485 Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
487 public void onSuccess(Node result) {
488 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), result);
489 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
493 public void onFailure(Throwable t) {
497 return promise.future();
501 public void onReceive(final Object message, final ActorRef actorRef) {
502 LOG.debug("message received {}", message);
503 if (message instanceof MemberUp) {
504 final Member member = ((MemberUp) message).member();
505 LOG.info("Member is Up: {}", member);
506 if (member.address().equals(clusterExtension.selfAddress())) {
510 final String path = member.address() + PATH + topologyId;
511 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
513 // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
514 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
515 } else if (message instanceof MemberExited) {
517 final Member member = ((MemberExited) message).member();
518 LOG.info("Member exited cluster: {}", member);
519 peers.remove(member.address());
520 } else if (message instanceof MemberRemoved) {
522 final Member member = ((MemberRemoved) message).member();
523 LOG.info("Member was removed from cluster: {}", member);
524 peers.remove(member.address());
525 } else if (message instanceof UnreachableMember) {
527 final Member member = ((UnreachableMember) message).member();
528 LOG.info("Member is unreachable: {}", member);
529 peers.remove(member.address());
530 } else if (message instanceof ReachableMember) {
532 final Member member = ((ReachableMember) message).member();
533 LOG.info("Member is reachable again: {}", member);
535 if (member.address().equals(clusterExtension.selfAddress())) {
539 final String path = member.address() + PATH + topologyId;
540 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
542 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
543 } else if (message instanceof ActorIdentity) {
544 LOG.debug("Received ActorIdentity message", message);
545 final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
546 if (((ActorIdentity) message).getRef() == null) {
547 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
548 final ActorRef self = TypedActor.context().self();
549 final ActorContext context = TypedActor.context();
550 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
553 LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
554 context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
557 }, system.dispatcher());
560 LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
562 clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
563 } else if (message instanceof CustomIdentifyMessageReply) {
565 LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
566 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
567 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
568 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
573 } else if (message instanceof CustomIdentifyMessage) {
574 LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
575 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
576 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
577 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
582 actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
586 private void resyncPeer(final TopologyManager peer) {
587 final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
588 final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
590 Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
592 public void onSuccess(Optional<Topology> result) {
593 if (result.isPresent() && result.get().getNode() != null) {
594 for (final Node node : result.get().getNode()) {
595 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
596 peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
597 // we dont care about the future from now on since we will be notified by the onConnected event
603 public void onFailure(Throwable t) {
604 LOG.error("Unable to read from datastore");