forked from RafaelCartenet/mcp-databricks-server
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py.backup
More file actions
82 lines (67 loc) · 2.8 KB
/
main.py.backup
File metadata and controls
82 lines (67 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import Optional
from mcp.server.fastmcp import FastMCP
from dbapi import execute_statement
from databricks_formatter import format_query_results
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Configuration constants
DATABRICKS_HOST = os.environ.get("DATABRICKS_HOST", "")
DATABRICKS_TOKEN = os.environ.get("DATABRICKS_TOKEN", "")
DATABRICKS_SQL_WAREHOUSE_ID = os.environ.get("DATABRICKS_SQL_WAREHOUSE_ID", "")
mcp = FastMCP("databricks")
@mcp.tool()
async def execute_sql_query(sql: str, warehouse_id: Optional[str] = None) -> str:
"""Execute a SQL query on Databricks and return the results.
Args:
sql: The SQL query to execute
warehouse_id: Optional warehouse ID to use (defaults to DATABRICKS_SQL_WAREHOUSE_ID environment variable)
"""
try:
result = await execute_statement(sql, warehouse_id)
return format_query_results(result)
except Exception as e:
return f"Error executing SQL query: WAREHOUSE USED IS : {warehouse_id}{str(e)}"
@mcp.tool()
async def list_schemas(catalog: str, warehouse_id: Optional[str] = None) -> str:
"""List all available schemas in a Databricks catalog.
Args:
catalog: The catalog name to list schemas from
warehouse_id: Optional warehouse ID to use (defaults to DATABRICKS_SQL_WAREHOUSE_ID environment variable)
"""
sql = f"SHOW SCHEMAS IN {catalog}"
try:
result = await execute_statement(sql, warehouse_id)
return format_query_results(result)
except Exception as e:
return f"Error listing schemas: {str(e)}"
@mcp.tool()
async def list_tables(schema: str, warehouse_id: Optional[str] = None) -> str:
"""List all tables in a specific schema.
Args:
schema: The schema name to list tables from
warehouse_id: Optional warehouse ID to use (defaults to DATABRICKS_SQL_WAREHOUSE_ID environment variable)
"""
sql = f"SHOW TABLES IN {schema}"
try:
result = await execute_statement(sql, warehouse_id)
return format_query_results(result)
except Exception as e:
return f"Error listing tables: {str(e)}"
@mcp.tool()
async def describe_table(table_name: str, warehouse_id: Optional[str] = None) -> str:
"""Describe a table's schema.
Args:
table_name: The fully qualified table name (e.g., schema.table_name)
warehouse_id: Optional warehouse ID to use (defaults to DATABRICKS_SQL_WAREHOUSE_ID environment variable)
"""
sql = f"DESCRIBE TABLE {table_name}"
try:
result = await execute_statement(sql, warehouse_id)
return format_query_results(result)
except Exception as e:
return f"Error describing table: {str(e)}"
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')