Rework ClusterSingletonServiceGroupImpl locking
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / AbstractClusterSingletonServiceProviderImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.singleton.dom.impl;
10
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Preconditions;
16 import com.google.common.base.Strings;
17 import com.google.common.base.Verify;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26 import javax.annotation.CheckForNull;
27 import javax.annotation.Nonnull;
28 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
32 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListenerRegistration;
33 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
37 import org.opendaylight.yangtools.concepts.Path;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Abstract class {@link AbstractClusterSingletonServiceProviderImpl} represents implementations of
43  * {@link ClusterSingletonServiceProvider} and it implements {@link GenericEntityOwnershipListener}
44  * for providing OwnershipChange for all registered {@link ClusterSingletonServiceGroup} entity
45  * candidate.
46  *
47  * @param <P> the instance identifier path type
48  * @param <E> the GenericEntity type
49  * @param <C> the GenericEntityOwnershipChange type
50  * @param <G> the GenericEntityOwnershipListener type
51  * @param <S> the GenericEntityOwnershipService type
52  * @param <R> the GenericEntityOwnershipListenerRegistration type
53  */
54 public abstract class AbstractClusterSingletonServiceProviderImpl<P extends Path<P>, E extends GenericEntity<P>,
55         C extends GenericEntityOwnershipChange<P, E>,
56         G extends GenericEntityOwnershipListener<P, C>,
57         S extends GenericEntityOwnershipService<P, E, G>,
58         R extends GenericEntityOwnershipListenerRegistration<P, G>>
59         implements ClusterSingletonServiceProvider, GenericEntityOwnershipListener<P, C> {
60
61     private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterSingletonServiceProviderImpl.class);
62
63     @VisibleForTesting
64     static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
65     @VisibleForTesting
66     static final String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
67
68     private final S entityOwnershipService;
69     private final Map<String, ClusterSingletonServiceGroup<P, E, C>> serviceGroupMap = new ConcurrentHashMap<>();
70
71     /* EOS Entity Listeners Registration */
72     private R serviceEntityListenerReg;
73     private R asyncCloseEntityListenerReg;
74
75     /**
76      * Class constructor.
77      *
78      * @param entityOwnershipService relevant EOS
79      */
80     protected AbstractClusterSingletonServiceProviderImpl(@Nonnull final S entityOwnershipService) {
81         this.entityOwnershipService = Preconditions.checkNotNull(entityOwnershipService);
82     }
83
84     /**
85      * This method must be called once on startup to initialize this provider.
86      */
87     public final void initializeProvider() {
88         LOG.debug("Initialization method for ClusterSingletonService Provider {}", this);
89         this.serviceEntityListenerReg = registerListener(SERVICE_ENTITY_TYPE, entityOwnershipService);
90         this.asyncCloseEntityListenerReg = registerListener(CLOSE_SERVICE_ENTITY_TYPE, entityOwnershipService);
91     }
92
93     @Override
94     public final synchronized ClusterSingletonServiceRegistration registerClusterSingletonService(
95             @CheckForNull final ClusterSingletonService service) {
96         LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service, this);
97
98         final String serviceIdentifier = service.getIdentifier().getValue();
99         Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceIdentifier),
100                 "ClusterSingletonService identifier may not be null nor empty");
101
102         final ClusterSingletonServiceGroup<P, E, C> serviceGroup;
103         ClusterSingletonServiceGroup<P, E, C> existing = serviceGroupMap.get(serviceIdentifier);
104         if (existing == null) {
105             serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
106             serviceGroupMap.put(serviceIdentifier, serviceGroup);
107
108             try {
109                 initializeOrRemoveGroup(serviceGroup);
110             } catch (CandidateAlreadyRegisteredException e) {
111                 throw new IllegalArgumentException("Service group already registered", e);
112             }
113         } else {
114             serviceGroup = existing;
115         }
116
117         final ClusterSingletonServiceRegistration reg =  new AbstractClusterSingletonServiceRegistration(service) {
118             @Override
119             protected void removeRegistration() {
120                 // We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
121                 // shutdown of the group and user registering it again.
122                 AbstractClusterSingletonServiceProviderImpl.this.removeRegistration(serviceIdentifier, this);
123             }
124         };
125
126         serviceGroup.registerService(reg);
127         return reg;
128     }
129
130     private ClusterSingletonServiceGroup<P, E, C> createGroup(final String serviceIdentifier,
131             final List<ClusterSingletonServiceRegistration> services) {
132         return new ClusterSingletonServiceGroupImpl<>(serviceIdentifier, entityOwnershipService,
133                 createEntity(SERVICE_ENTITY_TYPE, serviceIdentifier),
134                 createEntity(CLOSE_SERVICE_ENTITY_TYPE, serviceIdentifier), services);
135     }
136
137     private void initializeOrRemoveGroup(final ClusterSingletonServiceGroup<P, E, C> group)
138             throws CandidateAlreadyRegisteredException {
139         try {
140             group.initialize();
141         } catch (CandidateAlreadyRegisteredException e) {
142             serviceGroupMap.remove(group.getIdentifier(), group);
143             throw e;
144         }
145     }
146
147     void removeRegistration(final String serviceIdentifier, final ClusterSingletonServiceRegistration reg) {
148
149         final PlaceholderGroup<P, E, C> placeHolder;
150         final ListenableFuture<?> future;
151         synchronized (this) {
152             final ClusterSingletonServiceGroup<P, E, C> lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier));
153             future = lookup.unregisterService(reg);
154             if (future == null) {
155                 return;
156             }
157
158             // Close the group and replace it with a placeholder
159             LOG.debug("Closing service group {}", serviceIdentifier);
160             placeHolder = new PlaceholderGroup<>(lookup, future);
161
162             final String identifier = reg.getInstance().getIdentifier().getValue();
163             verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
164             LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
165
166             lookup.closeClusterSingletonGroup();
167         }
168
169         future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
170     }
171
172     synchronized void finishShutdown(final PlaceholderGroup<P, E, C> placeHolder) {
173         final String identifier = placeHolder.getIdentifier();
174         LOG.debug("Service group {} closed", identifier);
175
176         final List<ClusterSingletonServiceRegistration> services = placeHolder.getServices();
177         if (services.isEmpty()) {
178             // No services, we are all done
179             if (serviceGroupMap.remove(identifier, placeHolder)) {
180                 LOG.debug("Service group {} removed", placeHolder);
181             } else {
182                 LOG.debug("Service group {} superseded by {}", placeHolder, serviceGroupMap.get(identifier));
183             }
184             return;
185         }
186
187         // Placeholder is being retired, we are reusing its services as the seed for the group.
188         final ClusterSingletonServiceGroup<P, E, C> group = createGroup(identifier, services);
189         Verify.verify(serviceGroupMap.replace(identifier, placeHolder, group));
190         placeHolder.setSuccessor(group);
191         LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
192
193         try {
194             initializeOrRemoveGroup(group);
195         } catch (CandidateAlreadyRegisteredException e) {
196             LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
197         }
198     }
199
200     @Override
201     public final void close() {
202         LOG.debug("Close method for ClusterSingletonService Provider {}", this);
203
204         if (serviceEntityListenerReg != null) {
205             serviceEntityListenerReg.close();
206             serviceEntityListenerReg = null;
207         }
208
209         final List<ListenableFuture<?>> listGroupCloseListFuture = new ArrayList<>();
210
211         for (final ClusterSingletonServiceGroup<P, E, C> serviceGroup : serviceGroupMap.values()) {
212             listGroupCloseListFuture.add(serviceGroup.closeClusterSingletonGroup());
213         }
214
215         final ListenableFuture<List<Object>> finalCloseFuture = Futures.allAsList(listGroupCloseListFuture);
216         Futures.addCallback(finalCloseFuture, new FutureCallback<List<?>>() {
217
218             @Override
219             public void onSuccess(final List<?> result) {
220                 cleanup();
221             }
222
223             @Override
224             public void onFailure(final Throwable throwable) {
225                 LOG.warn("Unexpected problem by closing ClusterSingletonServiceProvider {}",
226                     AbstractClusterSingletonServiceProviderImpl.this, throwable);
227                 cleanup();
228             }
229         }, MoreExecutors.directExecutor());
230     }
231
232     @Override
233     public final void ownershipChanged(final C ownershipChange) {
234         LOG.debug("Ownership change for ClusterSingletonService Provider {}", ownershipChange);
235         final String serviceIdentifier = getServiceIdentifierFromEntity(ownershipChange.getEntity());
236         final ClusterSingletonServiceGroup<P, E, C> serviceHolder = serviceGroupMap.get(serviceIdentifier);
237         if (serviceHolder != null) {
238             serviceHolder.ownershipChanged(ownershipChange);
239         } else {
240             LOG.debug("ClusterSingletonServiceGroup was not found for serviceIdentifier {}", serviceIdentifier);
241         }
242     }
243
244     /**
245      * Method implementation registers a defined {@link GenericEntityOwnershipListenerRegistration} type
246      * EntityOwnershipListenerRegistration.
247      *
248      * @param entityType the type of the entity
249      * @param entityOwnershipServiceInst - EOS type
250      * @return instance of EntityOwnershipListenerRegistration
251      */
252     protected abstract R registerListener(String entityType, S entityOwnershipServiceInst);
253
254     /**
255      * Creates an extended {@link GenericEntity} instance.
256      *
257      * @param entityType the type of the entity
258      * @param entityIdentifier the identifier of the entity
259      * @return instance of Entity extended GenericEntity type
260      */
261     protected abstract E createEntity(String entityType, String entityIdentifier);
262
263     /**
264      * Method is responsible for parsing ServiceGroupIdentifier from E entity.
265      *
266      * @param entity instance of GenericEntity type
267      * @return ServiceGroupIdentifier parsed from entity key value.
268      */
269     protected abstract String getServiceIdentifierFromEntity(E entity);
270
271     /**
272      * Method is called async. from close method in end of Provider lifecycle.
273      */
274     final void cleanup() {
275         LOG.debug("Final cleaning ClusterSingletonServiceProvider {}", this);
276         if (asyncCloseEntityListenerReg != null) {
277             asyncCloseEntityListenerReg.close();
278             asyncCloseEntityListenerReg = null;
279         }
280         serviceGroupMap.clear();
281     }
282 }