2 * Copyright (c) 2014, 2015 EBay Software Foundation and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.lib.impl;
11 import io.netty.channel.Channel;
13 import java.util.Iterator;
14 import java.util.List;
16 import java.util.UUID;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorService;
20 import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
21 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
22 import org.opendaylight.ovsdb.lib.LockStolenCallback;
23 import org.opendaylight.ovsdb.lib.MonitorCallBack;
24 import org.opendaylight.ovsdb.lib.MonitorHandle;
25 import org.opendaylight.ovsdb.lib.OvsdbClient;
26 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
27 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
28 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
29 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
30 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
31 import org.opendaylight.ovsdb.lib.message.TableUpdate;
32 import org.opendaylight.ovsdb.lib.message.TableUpdates;
33 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
34 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
35 import org.opendaylight.ovsdb.lib.notation.Row;
36 import org.opendaylight.ovsdb.lib.operations.Operation;
37 import org.opendaylight.ovsdb.lib.operations.OperationResult;
38 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
39 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
40 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
41 import org.opendaylight.ovsdb.lib.schema.TableSchema;
42 import org.opendaylight.ovsdb.lib.schema.typed.TypedBaseTable;
43 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
44 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import com.fasterxml.jackson.databind.JsonNode;
49 import com.fasterxml.jackson.databind.node.ObjectNode;
50 import com.google.common.base.Function;
51 import com.google.common.collect.ImmutableMap;
52 import com.google.common.collect.Lists;
53 import com.google.common.collect.Maps;
54 import com.google.common.util.concurrent.Futures;
55 import com.google.common.util.concurrent.ListenableFuture;
56 import com.google.common.util.concurrent.SettableFuture;
59 public class OvsdbClientImpl implements OvsdbClient {
61 private static final Logger LOG = LoggerFactory.getLogger(OvsdbClientImpl.class);
62 private ExecutorService executorService;
64 private Map<String, DatabaseSchema> schema = Maps.newHashMap();
65 private Map<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
66 private OvsdbRPC.Callback rpcCallback;
67 private OvsdbConnectionInfo connectionInfo;
68 private Channel channel;
70 public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type, ExecutorService executorService) {
72 this.executorService = executorService;
73 this.channel = channel;
75 this.connectionInfo = new OvsdbConnectionInfo(channel, type);
81 void setupUpdateListener() {
82 if (rpcCallback == null) {
83 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
85 public void update(Object node, UpdateNotification updateNotification) {
86 Object key = updateNotification.getContext();
87 CallbackContext callbackContext = monitorCallbacks.get(key);
88 MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
89 if (monitorCallBack == null) {
91 LOG.info("callback received with context {}, but no known handler. Ignoring!", key);
94 TableUpdates updates = transformingCallback(updateNotification.getUpdates(),
95 callbackContext.schema);
96 monitorCallBack.update(updates, callbackContext.schema);
100 public void locked(Object node, List<String> ids) {
105 public void stolen(Object node, List<String> ids) {
109 this.rpcCallback = temp;
110 rpc.registerCallback(temp);
115 protected TableUpdates transformingCallback(JsonNode tableUpdatesJson, DatabaseSchema dbSchema) {
116 //todo(ashwin): we should move all the JSON parsing logic to a utility class
117 if (tableUpdatesJson instanceof ObjectNode) {
118 Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
119 ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
120 for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();) {
121 Map.Entry<String, JsonNode> entry = itr.next();
123 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
124 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
125 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
128 return new TableUpdates(tableUpdateMap);
134 public ListenableFuture<List<OperationResult>> transact(DatabaseSchema dbSchema, List<Operation> operations) {
136 //todo, we may not need transactionbuilder if we can have JSON objects
137 TransactBuilder builder = new TransactBuilder(dbSchema);
138 for (Operation operation : operations) {
139 builder.addOperation(operation);
142 return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
146 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
147 List<MonitorRequest> monitorRequest,
148 final MonitorCallBack callback) {
150 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
151 new Function<MonitorRequest, String>() {
153 public String apply(MonitorRequest input) {
154 return input.getTableName();
158 final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
159 registerCallback(monitorHandle, callback, dbSchema);
161 ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
163 public List<Object> params() {
164 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
169 result = monitor.get();
170 } catch (InterruptedException | ExecutionException e) {
173 return transformingCallback(result, dbSchema);
177 public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
178 List<MonitorRequest> monitorRequest,
179 final MonitorHandle monitorHandle,
180 final MonitorCallBack callback) {
182 final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
183 new Function<MonitorRequest, String>() {
185 public String apply(MonitorRequest input) {
186 return input.getTableName();
190 registerCallback(monitorHandle, callback, dbSchema);
192 ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
194 public List<Object> params() {
195 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
200 result = monitor.get();
201 } catch (InterruptedException | ExecutionException e) {
204 return transformingCallback(result, dbSchema);
207 private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
208 this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
209 setupUpdateListener();
213 public void cancelMonitor(final MonitorHandle handler) {
214 ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(new Params() {
216 public List<Object> params() {
217 return Lists.<Object>newArrayList(handler.getId());
221 JsonNode result = null;
223 result = cancelMonitor.get();
224 } catch (InterruptedException | ExecutionException e) {
225 LOG.error("Exception when canceling monitor handler {}", handler.getId());
228 if (result == null) {
229 LOG.error("Fail to cancel monitor with handler {}", handler.getId());
231 LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
236 public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
237 throw new UnsupportedOperationException("not yet implemented");
241 public ListenableFuture<Boolean> steal(String lockId) {
242 throw new UnsupportedOperationException("not yet implemented");
246 public ListenableFuture<Boolean> unLock(String lockId) {
247 throw new UnsupportedOperationException("not yet implemented");
251 public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
252 throw new UnsupportedOperationException("not yet implemented");
256 public void stopEchoService() {
257 throw new UnsupportedOperationException("not yet implemented");
261 public TransactionBuilder transactBuilder(DatabaseSchema dbSchema) {
262 return new TransactionBuilder(this, dbSchema);
266 public boolean isReady(int timeout) throws InterruptedException {
267 while (timeout > 0) {
268 if (!schema.isEmpty()) {
278 public ListenableFuture<List<String>> getDatabases() {
279 return rpc.list_dbs();
283 public ListenableFuture<DatabaseSchema> getSchema(final String database) {
285 DatabaseSchema databaseSchema = schema.get(database);
287 if (databaseSchema == null) {
288 return Futures.transform(
289 getSchemaFromDevice(Lists.newArrayList(database)),
290 new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
292 public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
293 if (result.containsKey(database)) {
294 DatabaseSchema dbSchema = result.get(database);
295 dbSchema.populateInternallyGeneratedColumns();
296 OvsdbClientImpl.this.schema.put(database, dbSchema);
304 return Futures.immediateFuture(databaseSchema);
308 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
309 Map<String, DatabaseSchema> schema = Maps.newHashMap();
310 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
311 populateSchema(dbNames, schema, future);
315 private void populateSchema(final List<String> dbNames,
316 final Map<String, DatabaseSchema> schema,
317 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
319 if (dbNames == null || dbNames.isEmpty()) {
323 Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
324 new com.google.common.base.Function<JsonNode, Void>() {
326 public Void apply(JsonNode jsonNode) {
328 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
329 if (schema.size() > 1 && !sfuture.isCancelled()) {
330 populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
331 } else if (schema.size() == 1) {
334 } catch (Exception e) {
335 sfuture.setException(e);
342 public void setRpc(OvsdbRPC rpc) {
346 static class CallbackContext {
347 MonitorCallBack monitorCallBack;
348 DatabaseSchema schema;
350 CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
351 this.monitorCallBack = monitorCallBack;
352 this.schema = schema;
357 public DatabaseSchema getDatabaseSchema(String dbName) {
358 return schema.get(dbName);
362 * This method finds the DatabaseSchema that matches a given Typed Table Class.
363 * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
364 * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
366 * @param klazz Typed Class that represents a Table
367 * @return DatabaseSchema that matches a Typed Table Class
369 private <T> DatabaseSchema getDatabaseSchemaForTypedTable(Class<T> klazz) {
370 TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
371 if (typedTable != null) {
372 return this.getDatabaseSchema(typedTable.database());
378 * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
379 * given the Typed Table Class
381 * @param klazz Typed Interface
382 * @return Proxy wrapper for the actual raw Row class.
385 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(Class<T> klazz) {
386 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
387 return this.createTypedRowWrapper(dbSchema, klazz);
391 * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
392 * DatabaseSchema and Typed Table Class.
394 * @param dbSchema Database Schema of interest
395 * @param klazz Typed Interface
396 * @return Proxy wrapper for the actual raw Row class.
399 public <T extends TypedBaseTable<?>> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
400 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<GenericTableSchema>());
404 * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
406 * @param klazz Typed Interface
407 * @param row The actual Row that the wrapper is operating on.
408 * It can be null if the caller is just interested in getting ColumnSchema.
409 * @return Proxy wrapper for the actual raw Row class.
413 public <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
414 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
415 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);
419 public OvsdbConnectionInfo getConnectionInfo() {
420 return connectionInfo;
424 public boolean isActive() {
425 return channel.isActive();
429 public void disconnect() {
430 channel.disconnect();