2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.singleton.dom.impl;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Strings;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.util.ArrayList;
22 import java.util.List;
24 import java.util.concurrent.ConcurrentHashMap;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
31 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
34 import org.opendaylight.yangtools.concepts.HierarchicalIdentifier;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Abstract class {@link AbstractClusterSingletonServiceProviderImpl} represents implementations of
41 * {@link ClusterSingletonServiceProvider} and it implements {@link GenericEntityOwnershipListener}
42 * for providing OwnershipChange for all registered {@link ClusterSingletonServiceGroup} entity
45 * @param <P> the instance identifier path type
46 * @param <E> the GenericEntity type
47 * @param <G> the GenericEntityOwnershipListener type
48 * @param <S> the GenericEntityOwnershipService type
50 public abstract class AbstractClusterSingletonServiceProviderImpl<P extends HierarchicalIdentifier<P>,
51 E extends GenericEntity<P>, G extends GenericEntityOwnershipListener<E>,
52 S extends GenericEntityOwnershipService<E, G>>
53 implements ClusterSingletonServiceProvider, GenericEntityOwnershipListener<E> {
54 private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterSingletonServiceProviderImpl.class);
57 static final @NonNull String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
59 static final @NonNull String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
61 private final S entityOwnershipService;
62 private final Map<String, ClusterSingletonServiceGroup<P, E>> serviceGroupMap = new ConcurrentHashMap<>();
64 /* EOS Entity Listeners Registration */
65 private Registration serviceEntityListenerReg;
66 private Registration asyncCloseEntityListenerReg;
71 * @param entityOwnershipService relevant EOS
73 protected AbstractClusterSingletonServiceProviderImpl(final @NonNull S entityOwnershipService) {
74 this.entityOwnershipService = requireNonNull(entityOwnershipService);
78 * This method must be called once on startup to initialize this provider.
80 public final void initializeProvider() {
81 LOG.debug("Initialization method for ClusterSingletonService Provider {}", this);
82 serviceEntityListenerReg = registerListener(SERVICE_ENTITY_TYPE, entityOwnershipService);
83 asyncCloseEntityListenerReg = registerListener(CLOSE_SERVICE_ENTITY_TYPE, entityOwnershipService);
87 public final synchronized ClusterSingletonServiceRegistration registerClusterSingletonService(
88 final ClusterSingletonService service) {
89 LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service, this);
91 final String serviceIdentifier = service.getIdentifier().getName();
92 checkArgument(!Strings.isNullOrEmpty(serviceIdentifier),
93 "ClusterSingletonService identifier may not be null nor empty");
95 final ClusterSingletonServiceGroup<P, E> serviceGroup;
96 final var existing = serviceGroupMap.get(serviceIdentifier);
97 if (existing == null) {
98 serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
99 serviceGroupMap.put(serviceIdentifier, serviceGroup);
102 initializeOrRemoveGroup(serviceGroup);
103 } catch (CandidateAlreadyRegisteredException e) {
104 throw new IllegalArgumentException("Service group already registered", e);
107 serviceGroup = existing;
110 final var reg = new AbstractClusterSingletonServiceRegistration(service) {
112 protected void removeRegistration() {
113 // We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
114 // shutdown of the group and user registering it again.
115 AbstractClusterSingletonServiceProviderImpl.this.removeRegistration(serviceIdentifier, this);
119 serviceGroup.registerService(reg);
123 private ClusterSingletonServiceGroup<P, E> createGroup(final String serviceIdentifier,
124 final List<ClusterSingletonServiceRegistration> services) {
125 return new ClusterSingletonServiceGroupImpl<>(serviceIdentifier, entityOwnershipService,
126 createEntity(SERVICE_ENTITY_TYPE, serviceIdentifier),
127 createEntity(CLOSE_SERVICE_ENTITY_TYPE, serviceIdentifier), services);
130 private void initializeOrRemoveGroup(final ClusterSingletonServiceGroup<P, E> group)
131 throws CandidateAlreadyRegisteredException {
134 } catch (CandidateAlreadyRegisteredException e) {
135 serviceGroupMap.remove(group.getIdentifier(), group);
140 void removeRegistration(final String serviceIdentifier, final ClusterSingletonServiceRegistration reg) {
141 final PlaceholderGroup<P, E> placeHolder;
142 final ListenableFuture<?> future;
143 synchronized (this) {
144 final var lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier));
145 future = lookup.unregisterService(reg);
146 if (future == null) {
150 // Close the group and replace it with a placeholder
151 LOG.debug("Closing service group {}", serviceIdentifier);
152 placeHolder = new PlaceholderGroup<>(lookup, future);
154 final String identifier = reg.getInstance().getIdentifier().getName();
155 verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
156 LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
158 lookup.closeClusterSingletonGroup();
161 future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
164 synchronized void finishShutdown(final PlaceholderGroup<P, E> placeHolder) {
165 final String identifier = placeHolder.getIdentifier();
166 LOG.debug("Service group {} closed", identifier);
168 final var services = placeHolder.getServices();
169 if (services.isEmpty()) {
170 // No services, we are all done
171 if (serviceGroupMap.remove(identifier, placeHolder)) {
172 LOG.debug("Service group {} removed", placeHolder);
174 LOG.debug("Service group {} superseded by {}", placeHolder, serviceGroupMap.get(identifier));
179 // Placeholder is being retired, we are reusing its services as the seed for the group.
180 final var group = createGroup(identifier, services);
181 verify(serviceGroupMap.replace(identifier, placeHolder, group));
182 placeHolder.setSuccessor(group);
183 LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
186 initializeOrRemoveGroup(group);
187 } catch (CandidateAlreadyRegisteredException e) {
188 LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
193 public final void close() {
194 LOG.debug("Close method for ClusterSingletonService Provider {}", this);
196 if (serviceEntityListenerReg != null) {
197 serviceEntityListenerReg.close();
198 serviceEntityListenerReg = null;
201 final var futures = serviceGroupMap.values().stream()
202 .map(ClusterSingletonServiceGroup::closeClusterSingletonGroup)
204 final var future = Futures.allAsList(futures);
205 Futures.addCallback(future, new FutureCallback<>() {
207 public void onSuccess(final List<Object> result) {
212 public void onFailure(final Throwable throwable) {
213 LOG.warn("Unexpected problem by closing ClusterSingletonServiceProvider {}",
214 AbstractClusterSingletonServiceProviderImpl.this, throwable);
217 }, MoreExecutors.directExecutor());
221 public final void ownershipChanged(final E entity, final EntityOwnershipStateChange change,
222 final boolean inJeopardy) {
223 LOG.debug("Ownership change for ClusterSingletonService Provider on {} {} inJeopardy={}", entity, change,
225 final var serviceIdentifier = getServiceIdentifierFromEntity(entity);
226 final var serviceHolder = serviceGroupMap.get(serviceIdentifier);
227 if (serviceHolder != null) {
228 serviceHolder.ownershipChanged(entity, change, inJeopardy);
230 LOG.debug("ClusterSingletonServiceGroup was not found for serviceIdentifier {}", serviceIdentifier);
235 * Method implementation registers the listener.
237 * @param entityType the type of the entity
238 * @param entityOwnershipServiceInst - EOS type
239 * @return a {@link Registration}
241 protected abstract Registration registerListener(String entityType, S entityOwnershipServiceInst);
244 * Creates an extended {@link GenericEntity} instance.
246 * @param entityType the type of the entity
247 * @param entityIdentifier the identifier of the entity
248 * @return instance of Entity extended GenericEntity type
250 protected abstract E createEntity(String entityType, String entityIdentifier);
253 * Method is responsible for parsing ServiceGroupIdentifier from E entity.
255 * @param entity instance of GenericEntity type
256 * @return ServiceGroupIdentifier parsed from entity key value.
258 protected abstract String getServiceIdentifierFromEntity(E entity);
261 * Method is called async. from close method in end of Provider lifecycle.
263 final void cleanup() {
264 LOG.debug("Final cleaning ClusterSingletonServiceProvider {}", this);
265 if (asyncCloseEntityListenerReg != null) {
266 asyncCloseEntityListenerReg.close();
267 asyncCloseEntityListenerReg = null;
269 serviceGroupMap.clear();