Remove @(Not)ThreadSafe annotation
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / AbstractClusterSingletonServiceProviderImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.singleton.dom.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Strings;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
27 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListenerRegistration;
31 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
35 import org.opendaylight.yangtools.concepts.Path;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Abstract class {@link AbstractClusterSingletonServiceProviderImpl} represents implementations of
41  * {@link ClusterSingletonServiceProvider} and it implements {@link GenericEntityOwnershipListener}
42  * for providing OwnershipChange for all registered {@link ClusterSingletonServiceGroup} entity
43  * candidate.
44  *
45  * @param <P> the instance identifier path type
46  * @param <E> the GenericEntity type
47  * @param <C> the GenericEntityOwnershipChange type
48  * @param <G> the GenericEntityOwnershipListener type
49  * @param <S> the GenericEntityOwnershipService type
50  * @param <R> the GenericEntityOwnershipListenerRegistration type
51  */
52 public abstract class AbstractClusterSingletonServiceProviderImpl<P extends Path<P>, E extends GenericEntity<P>,
53         C extends GenericEntityOwnershipChange<P, E>,
54         G extends GenericEntityOwnershipListener<P, C>,
55         S extends GenericEntityOwnershipService<P, E, G>,
56         R extends GenericEntityOwnershipListenerRegistration<P, G>>
57         implements ClusterSingletonServiceProvider, GenericEntityOwnershipListener<P, C> {
58
59     private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterSingletonServiceProviderImpl.class);
60
61     @VisibleForTesting
62     static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
63     @VisibleForTesting
64     static final String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
65
66     private final S entityOwnershipService;
67     private final Map<String, ClusterSingletonServiceGroup<P, E, C>> serviceGroupMap = new ConcurrentHashMap<>();
68
69     /* EOS Entity Listeners Registration */
70     private R serviceEntityListenerReg;
71     private R asyncCloseEntityListenerReg;
72
73     /**
74      * Class constructor.
75      *
76      * @param entityOwnershipService relevant EOS
77      */
78     protected AbstractClusterSingletonServiceProviderImpl(final @NonNull S entityOwnershipService) {
79         this.entityOwnershipService = requireNonNull(entityOwnershipService);
80     }
81
82     /**
83      * This method must be called once on startup to initialize this provider.
84      */
85     public final void initializeProvider() {
86         LOG.debug("Initialization method for ClusterSingletonService Provider {}", this);
87         this.serviceEntityListenerReg = registerListener(SERVICE_ENTITY_TYPE, entityOwnershipService);
88         this.asyncCloseEntityListenerReg = registerListener(CLOSE_SERVICE_ENTITY_TYPE, entityOwnershipService);
89     }
90
91     @Override
92     public final synchronized ClusterSingletonServiceRegistration registerClusterSingletonService(
93             final ClusterSingletonService service) {
94         LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service, this);
95
96         final String serviceIdentifier = service.getIdentifier().getValue();
97         checkArgument(!Strings.isNullOrEmpty(serviceIdentifier),
98             "ClusterSingletonService identifier may not be null nor empty");
99
100         final ClusterSingletonServiceGroup<P, E, C> serviceGroup;
101         ClusterSingletonServiceGroup<P, E, C> existing = serviceGroupMap.get(serviceIdentifier);
102         if (existing == null) {
103             serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
104             serviceGroupMap.put(serviceIdentifier, serviceGroup);
105
106             try {
107                 initializeOrRemoveGroup(serviceGroup);
108             } catch (CandidateAlreadyRegisteredException e) {
109                 throw new IllegalArgumentException("Service group already registered", e);
110             }
111         } else {
112             serviceGroup = existing;
113         }
114
115         final ClusterSingletonServiceRegistration reg =  new AbstractClusterSingletonServiceRegistration(service) {
116             @Override
117             protected void removeRegistration() {
118                 // We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
119                 // shutdown of the group and user registering it again.
120                 AbstractClusterSingletonServiceProviderImpl.this.removeRegistration(serviceIdentifier, this);
121             }
122         };
123
124         serviceGroup.registerService(reg);
125         return reg;
126     }
127
128     private ClusterSingletonServiceGroup<P, E, C> createGroup(final String serviceIdentifier,
129             final List<ClusterSingletonServiceRegistration> services) {
130         return new ClusterSingletonServiceGroupImpl<>(serviceIdentifier, entityOwnershipService,
131                 createEntity(SERVICE_ENTITY_TYPE, serviceIdentifier),
132                 createEntity(CLOSE_SERVICE_ENTITY_TYPE, serviceIdentifier), services);
133     }
134
135     private void initializeOrRemoveGroup(final ClusterSingletonServiceGroup<P, E, C> group)
136             throws CandidateAlreadyRegisteredException {
137         try {
138             group.initialize();
139         } catch (CandidateAlreadyRegisteredException e) {
140             serviceGroupMap.remove(group.getIdentifier(), group);
141             throw e;
142         }
143     }
144
145     void removeRegistration(final String serviceIdentifier, final ClusterSingletonServiceRegistration reg) {
146
147         final PlaceholderGroup<P, E, C> placeHolder;
148         final ListenableFuture<?> future;
149         synchronized (this) {
150             final ClusterSingletonServiceGroup<P, E, C> lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier));
151             future = lookup.unregisterService(reg);
152             if (future == null) {
153                 return;
154             }
155
156             // Close the group and replace it with a placeholder
157             LOG.debug("Closing service group {}", serviceIdentifier);
158             placeHolder = new PlaceholderGroup<>(lookup, future);
159
160             final String identifier = reg.getInstance().getIdentifier().getValue();
161             verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
162             LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
163
164             lookup.closeClusterSingletonGroup();
165         }
166
167         future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
168     }
169
170     synchronized void finishShutdown(final PlaceholderGroup<P, E, C> placeHolder) {
171         final String identifier = placeHolder.getIdentifier();
172         LOG.debug("Service group {} closed", identifier);
173
174         final List<ClusterSingletonServiceRegistration> services = placeHolder.getServices();
175         if (services.isEmpty()) {
176             // No services, we are all done
177             if (serviceGroupMap.remove(identifier, placeHolder)) {
178                 LOG.debug("Service group {} removed", placeHolder);
179             } else {
180                 LOG.debug("Service group {} superseded by {}", placeHolder, serviceGroupMap.get(identifier));
181             }
182             return;
183         }
184
185         // Placeholder is being retired, we are reusing its services as the seed for the group.
186         final ClusterSingletonServiceGroup<P, E, C> group = createGroup(identifier, services);
187         verify(serviceGroupMap.replace(identifier, placeHolder, group));
188         placeHolder.setSuccessor(group);
189         LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
190
191         try {
192             initializeOrRemoveGroup(group);
193         } catch (CandidateAlreadyRegisteredException e) {
194             LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
195         }
196     }
197
198     @Override
199     public final void close() {
200         LOG.debug("Close method for ClusterSingletonService Provider {}", this);
201
202         if (serviceEntityListenerReg != null) {
203             serviceEntityListenerReg.close();
204             serviceEntityListenerReg = null;
205         }
206
207         final List<ListenableFuture<?>> listGroupCloseListFuture = new ArrayList<>();
208
209         for (final ClusterSingletonServiceGroup<P, E, C> serviceGroup : serviceGroupMap.values()) {
210             listGroupCloseListFuture.add(serviceGroup.closeClusterSingletonGroup());
211         }
212
213         final ListenableFuture<List<Object>> finalCloseFuture = Futures.allAsList(listGroupCloseListFuture);
214         Futures.addCallback(finalCloseFuture, new FutureCallback<List<?>>() {
215
216             @Override
217             public void onSuccess(final List<?> result) {
218                 cleanup();
219             }
220
221             @Override
222             public void onFailure(final Throwable throwable) {
223                 LOG.warn("Unexpected problem by closing ClusterSingletonServiceProvider {}",
224                     AbstractClusterSingletonServiceProviderImpl.this, throwable);
225                 cleanup();
226             }
227         }, MoreExecutors.directExecutor());
228     }
229
230     @Override
231     public final void ownershipChanged(final C ownershipChange) {
232         LOG.debug("Ownership change for ClusterSingletonService Provider {}", ownershipChange);
233         final String serviceIdentifier = getServiceIdentifierFromEntity(ownershipChange.getEntity());
234         final ClusterSingletonServiceGroup<P, E, C> serviceHolder = serviceGroupMap.get(serviceIdentifier);
235         if (serviceHolder != null) {
236             serviceHolder.ownershipChanged(ownershipChange);
237         } else {
238             LOG.debug("ClusterSingletonServiceGroup was not found for serviceIdentifier {}", serviceIdentifier);
239         }
240     }
241
242     /**
243      * Method implementation registers a defined {@link GenericEntityOwnershipListenerRegistration} type
244      * EntityOwnershipListenerRegistration.
245      *
246      * @param entityType the type of the entity
247      * @param entityOwnershipServiceInst - EOS type
248      * @return instance of EntityOwnershipListenerRegistration
249      */
250     protected abstract R registerListener(String entityType, S entityOwnershipServiceInst);
251
252     /**
253      * Creates an extended {@link GenericEntity} instance.
254      *
255      * @param entityType the type of the entity
256      * @param entityIdentifier the identifier of the entity
257      * @return instance of Entity extended GenericEntity type
258      */
259     protected abstract E createEntity(String entityType, String entityIdentifier);
260
261     /**
262      * Method is responsible for parsing ServiceGroupIdentifier from E entity.
263      *
264      * @param entity instance of GenericEntity type
265      * @return ServiceGroupIdentifier parsed from entity key value.
266      */
267     protected abstract String getServiceIdentifierFromEntity(E entity);
268
269     /**
270      * Method is called async. from close method in end of Provider lifecycle.
271      */
272     final void cleanup() {
273         LOG.debug("Final cleaning ClusterSingletonServiceProvider {}", this);
274         if (asyncCloseEntityListenerReg != null) {
275             asyncCloseEntityListenerReg.close();
276             asyncCloseEntityListenerReg = null;
277         }
278         serviceGroupMap.clear();
279     }
280 }