2 * Copyright (C) 2014 EBay Software Foundation
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Ashwin Raveendran
10 package org.opendaylight.ovsdb.lib;
12 import com.fasterxml.jackson.databind.JsonNode;
13 import com.google.common.base.Function;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Lists;
16 import com.google.common.collect.Maps;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.SettableFuture;
21 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
22 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
23 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
24 import org.opendaylight.ovsdb.lib.message.TableUpdates;
25 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
26 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
27 import org.opendaylight.ovsdb.lib.operations.Operation;
28 import org.opendaylight.ovsdb.lib.operations.OperationResult;
29 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
30 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
31 import org.opendaylight.ovsdb.lib.schema.TableSchema;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import java.util.HashMap;
36 import java.util.List;
38 import java.util.Queue;
39 import java.util.UUID;
40 import java.util.concurrent.ExecutorService;
43 public class OvsDBClientImpl implements OvsDBClient {
45 protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
46 private ExecutorService executorService;
48 private Map<String, DatabaseSchema> schema = Maps.newHashMap();
49 private HashMap<String, MonitorCallBack> monitorCallbacks = Maps.newHashMap();
50 private Queue<Throwable> exceptions;
51 private OvsdbRPC.Callback rpcCallback;
53 public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
55 this.executorService = executorService;
61 void setupUpdateListner() {
62 if (rpcCallback == null) {
63 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
65 public void update(Object node, UpdateNotification upadateNotification) {
66 Object key = upadateNotification.getContext();
67 MonitorCallBack monitorCallBack = monitorCallbacks.get(key);
68 if (monitorCallBack == null) {
70 logger.info("callback received with context {}, but no known handler. Ignoring!", key);
73 monitorCallBack.update(upadateNotification.getUpdate());
77 public void locked(Object node, List<String> ids) {
82 public void stolen(Object node, List<String> ids) {
86 this.rpcCallback = temp;
87 rpc.registerCallback(temp);
92 public ListenableFuture<List<OperationResult>> transact(List<Operation> operations) {
94 //todo, we may not need transactionbuilder if we can have JSON objects
95 TransactBuilder builder = new TransactBuilder();
96 for (Operation o : operations) {
97 builder.addOperation(o);
100 return rpc.transact(builder);
104 public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
105 List<MonitorRequest<E>> monitorRequest,
106 final MonitorCallBack callback) {
108 final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
109 new Function<MonitorRequest<E>, String>() {
111 public String apply(MonitorRequest<E> input) {
112 return input.getTableName();
116 final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
117 registerCallback(monitorHandle, callback);
119 ListenableFuture<TableUpdates> monitor = rpc.monitor(new Params() {
121 public List<Object> params() {
122 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
125 Futures.addCallback(monitor, new FutureCallback<TableUpdates>() {
127 public void onSuccess(TableUpdates result) {
128 callback.update(result);
132 public void onFailure(Throwable t) {
133 callback.exception(t);
137 return monitorHandle;
140 private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback) {
141 this.monitorCallbacks.put(monitorHandle.getId(), callback);
142 setupUpdateListner();
146 public void cancelMonitor(MonitorHandle handler) {
147 throw new UnsupportedOperationException("not yet implemented");
151 public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
152 throw new UnsupportedOperationException("not yet implemented");
156 public ListenableFuture<Boolean> steal(String lockId) {
157 throw new UnsupportedOperationException("not yet implemented");
161 public ListenableFuture<Boolean> unLock(String lockId) {
162 throw new UnsupportedOperationException("not yet implemented");
166 public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
167 throw new UnsupportedOperationException("not yet implemented");
171 public void stopEchoService() {
172 throw new UnsupportedOperationException("not yet implemented");
176 public TransactionBuilder transactBuilder() {
177 return new TransactionBuilder(this);
181 public boolean isReady(int timeout) throws InterruptedException {
182 while (timeout > 0) {
183 if (!schema.isEmpty()) {
193 public ListenableFuture<List<String>> getDatabases() {
194 return rpc.list_dbs();
198 public ListenableFuture<DatabaseSchema> getSchema(final String database, final boolean cacheResult) {
200 DatabaseSchema databaseSchema = schema.get(database);
202 if (databaseSchema == null || cacheResult) {
203 return Futures.transform(
204 getSchemaFromDevice(Lists.newArrayList(database)),
205 new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
207 public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
208 if (result.containsKey(database)) {
209 DatabaseSchema s = result.get(database);
211 OvsDBClientImpl.this.schema.put(database, s);
222 return Futures.immediateFuture(databaseSchema);
226 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
227 Map<String, DatabaseSchema> schema = Maps.newHashMap();
228 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
229 _populateSchema(dbNames, schema, future);
233 private void _populateSchema(final List<String> dbNames,
234 final Map<String, DatabaseSchema> schema,
235 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
237 if (dbNames == null || dbNames.isEmpty()) {
241 Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
242 new com.google.common.base.Function<JsonNode, Void>() {
244 public Void apply(JsonNode jsonNode) {
246 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
247 if (schema.size() > 1 && !sfuture.isCancelled()) {
248 _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
249 } else if (schema.size() == 1) {
252 } catch (Throwable e) {
253 sfuture.setException(e);
260 public void setRpc(OvsdbRPC rpc) {
264 public Queue<Throwable> getExceptions() {