+ return true;
+ }
+
+ /**
+ * Invoke add-group RPC, and put listenable future associated with the RPC into
+ * the given map.
+ *
+ * @param map
+ * The map to store listenable futures associated with add-group RPC.
+ * @param group
+ * The group to add.
+ */
+ private void addGroup(Map<Long, ListenableFuture<?>> map, Group group) {
+ KeyedInstanceIdentifier<Group, GroupKey> groupIdent = nodeIdentity.child(Group.class, group.getKey());
+ final Long groupId = group.getGroupId().getValue();
+ ListenableFuture<?> future = JdkFutureAdapters
+ .listenInPoolThread(provider.getGroupCommiter().add(groupIdent, group, nodeIdentity));
+
+ Futures.addCallback(future, new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(Object result) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("add-group RPC completed: node={}, id={}",
+ nodeIdentity.firstKeyOf(Node.class).getId().getValue(), groupId);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ String msg = "add-group RPC failed: node=" + nodeIdentity.firstKeyOf(Node.class).getId().getValue()
+ + ", id=" + groupId;
+ LOG.debug(msg, cause);
+ }
+ });
+
+ map.put(groupId, future);
+ }
+
+ /**
+ * Wait for completion of add-group RPC.
+ *
+ * @param nodeId
+ * The identifier for the target node.
+ * @param future
+ * Future associated with add-group RPC that installs the target
+ * group.
+ */
+ private void awaitGroup(String nodeId, ListenableFuture<?> future) {
+ awaitGroups(nodeId, Collections.singleton(future));
+ }
+
+ /**
+ * Wait for completion of add-group RPCs.
+ *
+ * @param nodeId
+ * The identifier for the target node.
+ * @param futures
+ * A collection of futures associated with add-group RPCs.
+ */
+ private void awaitGroups(String nodeId, Collection<ListenableFuture<?>> futures) {
+ if (!futures.isEmpty()) {
+ long timeout = Math.min(ADD_GROUP_TIMEOUT * futures.size(), MAX_ADD_GROUP_TIMEOUT);
+ try {
+ Futures.successfulAsList(futures).get(timeout, TimeUnit.NANOSECONDS);
+ LOG.trace("awaitGroups() completed: node={}", nodeId);
+ } catch (TimeoutException | InterruptedException | ExecutionException e) {
+ LOG.debug("add-group RPCs did not complete: node={}", nodeId);
+ }
+ }