diff --git a/databricks-skills/databricks-ai-functions/1-task-functions.md b/databricks-skills/databricks-ai-functions/1-task-functions.md index eb989904..ba9fa444 100644 --- a/databricks-skills/databricks-ai-functions/1-task-functions.md +++ b/databricks-skills/databricks-ai-functions/1-task-functions.md @@ -27,31 +27,98 @@ df.withColumn("sentiment", expr("ai_analyze_sentiment(review_text)")).display() **Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_classify -**Syntax:** `ai_classify(content, labels)` -- `content`: STRING — text to classify -- `labels`: ARRAY\ — 2 to 20 mutually exclusive categories +### V2 Syntax (Recommended) + +``` +ai_classify(content, labels [, options]) +``` -Returns the matching label or `NULL`. +| Parameter | Type | Description | +|-----------|------|-------------| +| `content` | VARIANT or STRING | Text to classify. Accepts VARIANT output from `ai_parse_document` directly. | +| `labels` | STRING | JSON array `'["label1","label2"]'` or JSON object with descriptions `'{"label1":"description1","label2":"description2"}'` — 2 to 500 labels | +| `options` | MAP\ | Optional configuration (see below) | + +**Options:** + +| Key | Values | Description | +|-----|--------|-------------| +| `version` | `'1.0'`, `'2.0'` | Force API version (default: auto-detected from labels format) | +| `instructions` | STRING (max 20,000 chars) | Additional classification guidance | +| `multilabel` | `'true'`, `'false'` | Enable multi-label classification (default: `'false'`) | + +**Returns:** VARIANT containing `{"response": ["label"], "error_message": null}`. Access the label with `:response[0]` (single-label) or iterate `:response` (multi-label). Returns `NULL` if content is null. ```sql +-- Basic classification with JSON array labels SELECT ticket_text, - ai_classify(ticket_text, ARRAY('urgent', 'not urgent', 'spam')) AS priority + ai_classify(ticket_text, '["urgent", "not urgent", "spam"]'):response[0] AS priority +FROM support_tickets; + +-- Labels with descriptions for better disambiguation +SELECT ticket_text, + ai_classify( + ticket_text, + '{"billing_error":"Payment or invoice issues","shipping_delay":"Delivery or logistics problems","product_defect":"Broken or malfunctioning product","other":"Anything else"}' + ):response[0] AS category +FROM support_tickets; + +-- Multi-label classification +SELECT ticket_text, + ai_classify( + ticket_text, + '["billing", "shipping", "product_quality", "account_access"]', + map('multilabel', 'true') + ):response AS tags FROM support_tickets; ``` ```python from pyspark.sql.functions import expr df = spark.table("support_tickets") + +# Single-label — extract label directly +df.withColumn( + "priority", + expr("ai_classify(ticket_text, '[\"urgent\", \"not urgent\", \"spam\"]'):response[0]") +).display() + +# With instructions for context df.withColumn( "priority", - expr("ai_classify(ticket_text, array('urgent', 'not urgent', 'spam'))") + expr(""" + ai_classify( + ticket_text, + '["urgent", "not urgent", "spam"]', + map('instructions', 'Classify as urgent only if the customer reports a system outage or data loss') + ):response[0] + """) ).display() ``` **Tips:** -- Fewer labels = more consistent results (2–5 is optimal) -- Labels should be mutually exclusive and clearly distinguishable -- Not suitable for multi-label classification — run multiple calls if needed +- Fewer labels = more consistent results (2–5 is optimal for single-label) +- Use label descriptions (`{"label":"description"}` format) when labels are ambiguous +- V2 supports up to 500 labels (vs 20 in v1) — useful for fine-grained taxonomies +- Multi-label mode (`map('multilabel', 'true')`) returns all applicable labels — use when categories are not mutually exclusive +- Labels should be clearly distinguishable to avoid classification noise + +### Legacy V1 Syntax + +V1 syntax still works but V2 is recommended for new code. + +``` +ai_classify(content, labels) +``` +- `content`: STRING — text to classify +- `labels`: ARRAY\ — 2 to 20 mutually exclusive categories + +Returns: STRING (matching label or `NULL`). Access directly — no `:response` path needed. + +```sql +SELECT ai_classify(ticket_text, ARRAY('urgent', 'not urgent', 'spam')) AS priority +FROM support_tickets; +``` --- @@ -59,38 +126,199 @@ df.withColumn( **Docs:** https://docs.databricks.com/aws/en/sql/language-manual/functions/ai_extract -**Syntax:** `ai_extract(content, labels)` -- `content`: STRING — source text -- `labels`: ARRAY\ — entity types to extract +### V2 Syntax (Recommended) + +``` +ai_extract(content, schema [, options]) +``` -Returns a STRUCT where each field name matches a label. Fields are `NULL` if not found. +| Parameter | Type | Description | +|-----------|------|-------------| +| `content` | VARIANT or STRING | Source text. Accepts VARIANT output from `ai_parse_document` directly. | +| `schema` | STRING | JSON defining extraction structure (see schema formats below) | +| `options` | MAP\ | Optional configuration (see below) | + +**Options:** + +| Key | Values | Description | +|-----|--------|-------------| +| `version` | `'1.0'`, `'2.0'` | Force API version (default: auto-detected from schema format) | +| `instructions` | STRING (max 20,000 chars) | Additional extraction guidance | + +**Returns:** VARIANT containing `{"response": {...}, "error_message": null}`. Access fields with `:response.field` (SQL) or `["response"]["field"]` (Python DataFrame). Fields are `null` if not found. + +### Schema Formats + +The `schema` parameter accepts two formats: + +**Simple array** — just field names (equivalent to v1 behavior): +```json +'["person", "location", "date"]' +``` + +**Typed object** — with types, descriptions, nested objects, and arrays: +```json +'{ + "type": "object", + "properties": { + "vendor_name": {"type": "string", "description": "Company or supplier name"}, + "total_amount": {"type": "number"}, + "is_paid": {"type": "boolean"}, + "status": {"type": "enum", "values": ["pending", "approved", "rejected"]}, + "line_items": { + "type": "array", + "items": { + "type": "object", + "properties": { + "item_code": {"type": "string"}, + "quantity": {"type": "integer"}, + "unit_price": {"type": "number"} + } + } + } + } +}' +``` + +**Supported types:** `string`, `integer`, `number`, `boolean`, `enum` (up to 500 values), `object`, `array` + +**Limits:** max 128 fields, max 7 nesting levels, max 150 characters per field name + +### Examples ```sql --- Extract and access fields directly +-- Simple flat extraction (same fields as v1, but returns VARIANT) SELECT - entities.person, - entities.location, - entities.date + entities:response.person AS person, + entities:response.location AS location, + entities:response.date AS date_mentioned FROM ( SELECT ai_extract( 'John Doe called from New York on 2024-01-15.', - ARRAY('person', 'location', 'date') + '["person", "location", "date"]' ) AS entities - FROM messages ); + +-- Typed schema with descriptions for better accuracy +SELECT ai_extract( + invoice_text, + '{ + "type": "object", + "properties": { + "invoice_number": {"type": "string"}, + "vendor_name": {"type": "string", "description": "Company or supplier name"}, + "issue_date": {"type": "string", "description": "Date in YYYY-MM-DD format"}, + "total_amount": {"type": "number"}, + "tax_id": {"type": "string", "description": "Tax ID, digits only"} + } + }' +):response AS header +FROM invoices; + +-- Nested extraction — arrays of objects (NEW in v2) +SELECT ai_extract( + invoice_text, + '{ + "type": "object", + "properties": { + "invoice_number": {"type": "string"}, + "line_items": { + "type": "array", + "items": { + "type": "object", + "properties": { + "item_code": {"type": "string"}, + "description": {"type": "string"}, + "quantity": {"type": "integer"}, + "unit_price": {"type": "number"}, + "total": {"type": "number"} + } + } + } + } + }' +):response AS invoice_data +FROM invoices; ``` ```python from pyspark.sql.functions import expr + df = spark.table("messages") + +# Simple flat extraction df = df.withColumn( "entities", - expr("ai_extract(message, array('person', 'location', 'date'))") + expr("ai_extract(message, '[\"person\", \"location\", \"date\"]')") +) +df.select( + "entities:response.person", + "entities:response.location", + "entities:response.date" +).display() + +# Nested extraction with typed schema +schema = ''' +{ + "type": "object", + "properties": { + "invoice_number": {"type": "string"}, + "vendor_name": {"type": "string"}, + "line_items": { + "type": "array", + "items": { + "type": "object", + "properties": { + "item_code": {"type": "string"}, + "description": {"type": "string"}, + "quantity": {"type": "integer"}, + "unit_price": {"type": "number"}, + "total": {"type": "number"} + } + } + } + } +} +''' +df = spark.table("invoices") +df = df.withColumn("result", expr(f"ai_extract(invoice_text, '{schema.strip()}')")) +df.select( + "result:response.invoice_number", + "result:response.vendor_name", + "result:response.line_items" +).display() +``` + +**Composability with `ai_parse_document`:** V2 accepts VARIANT input directly — you can pass `ai_parse_document` output without casting to STRING: + +```python +df = ( + spark.read.format("binaryFile").load("/Volumes/catalog/schema/docs/") + .withColumn("parsed", expr("ai_parse_document(content)")) + # Pass VARIANT directly to ai_extract — no STRING cast needed + .withColumn("entities", expr("ai_extract(parsed, '[\"date\", \"amount\", \"vendor\"]')")) + .select("path", "entities:response.*") ) -df.select("entities.person", "entities.location", "entities.date").display() ``` -**Use `ai_query` instead when:** the output has nested arrays or more than ~5 levels of hierarchy. +**Use `ai_query` instead when:** extraction exceeds 128 fields or 7 nesting levels, requires a custom model endpoint, involves multimodal input, or needs sampling parameter control. + +### Legacy V1 Syntax + +V1 syntax still works but V2 is recommended for new code. + +``` +ai_extract(content, labels) +``` +- `content`: STRING — source text +- `labels`: ARRAY\ — field names to extract + +Returns: STRUCT where each field matches a label (access with dot notation: `entities.person`). + +```sql +SELECT ai_extract('John Doe from New York', ARRAY('person', 'location')) AS entities; +-- Access: entities.person, entities.location +``` --- @@ -336,8 +564,8 @@ df = ( # Chain with task-specific functions on the extracted text df = ( df.withColumn("summary", expr("ai_summarize(text_blocks, 50)")) - .withColumn("entities", expr("ai_extract(text_blocks, array('date', 'amount', 'vendor'))")) - .withColumn("category", expr("ai_classify(text_blocks, array('invoice', 'contract', 'report'))")) + .withColumn("entities", expr("ai_extract(text_blocks, '[\"date\", \"amount\", \"vendor\"]')")) + .withColumn("category", expr("ai_classify(text_blocks, '[\"invoice\", \"contract\", \"report\"]'):response[0]")) ) df.display() ``` diff --git a/databricks-skills/databricks-ai-functions/2-ai-query.md b/databricks-skills/databricks-ai-functions/2-ai-query.md index 60d860fa..804f706e 100644 --- a/databricks-skills/databricks-ai-functions/2-ai-query.md +++ b/databricks-skills/databricks-ai-functions/2-ai-query.md @@ -6,7 +6,7 @@ ## When to Use `ai_query` -- Output schema has **nested arrays or deeply nested STRUCTs** (e.g., `itens: [{codigo, descricao, qtde}]`) +- Output schema exceeds **128 fields or 7 nesting levels** (beyond `ai_extract` v2 limits). For most nested extraction including line-item arrays, prefer `ai_extract` v2 with a JSON schema — see [1-task-functions.md](1-task-functions.md#ai_extract) - Calling a **custom Model Serving endpoint** (your own fine-tuned model) - **Multimodal input** — passing binary image files via `files =>` - **Cross-document reasoning** — prompt includes content from multiple sources diff --git a/databricks-skills/databricks-ai-functions/4-document-processing-pipeline.md b/databricks-skills/databricks-ai-functions/4-document-processing-pipeline.md index cb8afbd6..f2b4f03d 100644 --- a/databricks-skills/databricks-ai-functions/4-document-processing-pipeline.md +++ b/databricks-skills/databricks-ai-functions/4-document-processing-pipeline.md @@ -13,11 +13,11 @@ When processing documents with AI Functions, apply this order of preference for | Stage | Preferred function | Use `ai_query` when... | |---|---|---| | Parse binary docs (PDF, DOCX, images) | `ai_parse_document` | Need image-level reasoning | -| Extract flat fields from text | `ai_extract` | Schema has nested arrays | -| Classify document type or status | `ai_classify` | More than 20 categories | +| Extract structured fields (flat or nested) | `ai_extract` v2 (JSON schema) | >128 fields, >7 levels, or custom model needed | +| Classify document type or status | `ai_classify` v2 (2–500 labels) | Need custom model control | | Score item similarity / matching | `ai_similarity` | Need cross-document reasoning | | Summarize long sections | `ai_summarize` | — | -| Extract nested JSON (e.g. line items) | `ai_query` with `responseFormat` | (This is the intended use case) | +| Extreme complexity / custom model / multimodal | `ai_query` with `responseFormat` | >128 fields, >7 nesting levels, custom endpoints, or image input | --- @@ -79,8 +79,8 @@ Each logical step in your document workflow maps to a `@dlt.table` stage. Data f ``` [Landing Volume] → Stage 1: ai_parse_document - → Stage 2: ai_classify (document type) - → Stage 3: ai_extract (flat fields) + ai_query (nested JSON) + → Stage 2: ai_classify v2 (document type) + → Stage 3: ai_extract v2 (flat + nested fields) → Stage 4: ai_similarity (item matching) → Stage 5: Final Delta output table ``` @@ -90,7 +90,7 @@ Each logical step in your document workflow maps to a `@dlt.table` stage. Data f ```python import dlt import yaml -from pyspark.sql.functions import expr, col, from_json +from pyspark.sql.functions import expr, col CFG = yaml.safe_load(open("/Workspace/path/to/config.yml")) ENDPOINT = CFG["models"]["default"] @@ -116,7 +116,7 @@ def raw_parsed(): # ── Stage 2: Classify document type ────────────────────────────────────────── -# Preferred: ai_classify — cheap, no endpoint selection +# Preferred: ai_classify v2 — cheap, no endpoint selection, up to 500 labels @dlt.table(comment="Document type classification") def classified_docs(): @@ -124,13 +124,29 @@ def classified_docs(): dlt.read("raw_parsed") .withColumn( "doc_type", - expr("ai_classify(text_blocks, array('invoice', 'purchase_order', 'receipt', 'contract', 'other'))") + expr(""" + ai_classify( + text_blocks, + '["invoice", "purchase_order", "receipt", "contract", "other"]' + ):response[0] + """) ) ) # ── Stage 3a: Flat field extraction ────────────────────────────────────────── -# Preferred: ai_extract for flat fields (vendor, date, total) +# Preferred: ai_extract v2 — typed schema improves accuracy + +HEADER_SCHEMA = '''{ + "type": "object", + "properties": { + "invoice_number": {"type": "string"}, + "vendor_name": {"type": "string", "description": "Company or supplier name"}, + "issue_date": {"type": "string", "description": "Date in dd/mm/yyyy format"}, + "total_amount": {"type": "number"}, + "tax_id": {"type": "string", "description": "Tax ID, digits only"} + } +}''' @dlt.table(comment="Flat header fields extracted from documents") def extracted_flat(): @@ -139,41 +155,60 @@ def extracted_flat(): .filter("doc_type = 'invoice'") .withColumn( "header", - expr("ai_extract(text_blocks, array('invoice_number', 'vendor_name', 'issue_date', 'total_amount', 'tax_id'))") + expr(f"ai_extract(text_blocks, '{HEADER_SCHEMA.strip()}')") ) .select("path", "doc_type", "text_blocks", col("header")) ) -# ── Stage 3b: Nested JSON extraction (last resort: ai_query) ───────────────── -# Use ai_query only because line_items is a nested array — ai_extract can't handle it - -@dlt.table(comment="Nested line items extracted — ai_query used for array schema only") +# ── Stage 3b: Nested field extraction ──────────────────────────────────────── +# ai_extract v2 handles nested arrays (up to 7 levels, 128 fields) + +LINE_ITEMS_SCHEMA = '''{ + "type": "object", + "properties": { + "line_items": { + "type": "array", + "items": { + "type": "object", + "properties": { + "item_code": {"type": "string"}, + "description": {"type": "string"}, + "quantity": {"type": "number"}, + "unit_price": {"type": "number"}, + "total": {"type": "number"} + } + } + } + } +}''' + +@dlt.table(comment="Nested line items extracted using ai_extract v2") def extracted_line_items(): return ( dlt.read("extracted_flat") .withColumn( - "ai_response", - expr(f""" - ai_query( - '{ENDPOINT}', - concat('{PROMPT.strip()}', '\\n\\nDocument text:\\n', LEFT(text_blocks, 6000)), - responseFormat => '{{"type":"json_object"}}', - failOnError => false - ) - """) + "line_items_raw", + expr(f"ai_extract(text_blocks, '{LINE_ITEMS_SCHEMA.strip()}')") ) - .withColumn( - "line_items", - from_json( - col("ai_response.response"), - "STRUCT>>" - ) + .select( + "path", "doc_type", "header", + expr("line_items_raw:response.line_items").alias("line_items"), + expr("line_items_raw:error_message").alias("extraction_error"), ) - .select("path", "doc_type", "header", "line_items", col("ai_response.error").alias("extraction_error")) ) +# ── Alternative: ai_query (use when exceeding ai_extract v2 limits) ───────── +# If the schema exceeds 128 fields or 7 nesting levels, or you need a custom +# model endpoint or multimodal input, fall back to ai_query: +# +# .withColumn("ai_response", expr(f""" +# ai_query('{ENDPOINT}', +# concat('{PROMPT.strip()}', '\\n\\nDocument text:\\n', LEFT(text_blocks, 6000)), +# responseFormat => '{{"type":"json_object"}}', +# failOnError => false) +# """)) + # ── Stage 4: Similarity matching ───────────────────────────────────────────── # Preferred: ai_similarity for fuzzy matching between extracted fields @@ -188,7 +223,7 @@ def vendor_matched(): extracted.crossJoin(vendors) .withColumn( "name_similarity", - expr("ai_similarity(header.vendor_name, vendor_name)") + expr("ai_similarity(header:response.vendor_name, vendor_name)") ) .filter("name_similarity > 0.80") .orderBy("name_similarity", ascending=False) @@ -208,11 +243,11 @@ def processed_docs(): .selectExpr( "path", "doc_type", - "header.invoice_number", - "header.vendor_name", - "header.issue_date", - "header.total_amount", - "line_items.line_items AS items", + "header:response.invoice_number", + "header:response.vendor_name", + "header:response.issue_date", + "header:response.total_amount", + "line_items AS items", ) ) @@ -463,7 +498,7 @@ with mlflow.start_run(): ## Tips 1. **Parse first, enrich second** — always run `ai_parse_document` as the first stage. Feed its text output to task-specific functions; never pass raw binary to `ai_query`. -2. **Flat fields → `ai_extract`; nested arrays → `ai_query`** — this is the clearest decision boundary. +2. **Structured extraction (flat or nested) → `ai_extract` v2; extreme complexity → `ai_query`** — `ai_extract` v2 handles nested arrays (up to 7 levels, 128 fields). Fall back to `ai_query` only for custom models, multimodal input, or schemas exceeding these limits. 3. **`failOnError => false` is mandatory in batch** — write errors to a sidecar `_errors` table rather than crashing the pipeline. 4. **Truncate before sending to `ai_query`** — use `LEFT(text, 6000)` or chunk long documents to stay within context window limits. 5. **Prompts belong in `config.yml`** — never hardcode prompt strings in pipeline code. A prompt change should be a config change, not a code change. diff --git a/databricks-skills/databricks-ai-functions/SKILL.md b/databricks-skills/databricks-ai-functions/SKILL.md index e3fc3fbb..0e1faf3e 100644 --- a/databricks-skills/databricks-ai-functions/SKILL.md +++ b/databricks-skills/databricks-ai-functions/SKILL.md @@ -25,8 +25,8 @@ There are three categories: | Task | Use this | Fall back to `ai_query` when... | |---|---|---| | Sentiment scoring | `ai_analyze_sentiment` | Never | -| Fixed-label routing | `ai_classify` (2–20 labels) | Never | -| Flat entity extraction | `ai_extract` | Output schema has nested arrays | +| Fixed-label routing | `ai_classify` (2–500 labels, multi-label supported) | Need custom model control | +| Structured extraction (flat or nested) | `ai_extract` (JSON schema, up to 7 levels) | >128 fields, >7 nesting levels, or custom model/multimodal | | Summarization | `ai_summarize` | Never — use `max_words=0` for uncapped | | Grammar correction | `ai_fix_grammar` | Never | | Translation | `ai_translate` | Target language not in the supported list | @@ -34,7 +34,7 @@ There are three categories: | Free-form generation | `ai_gen` | Need structured JSON output | | Semantic similarity | `ai_similarity` | Never | | PDF / document parsing | `ai_parse_document` | Need image-level reasoning | -| Complex JSON / reasoning | — | **This is the intended use case for `ai_query`** | +| Extreme complexity / custom model / multimodal | — | **>128 fields, >7 levels, custom endpoints, or image input — use `ai_query`** | ## Prerequisites @@ -47,30 +47,32 @@ There are three categories: ## Quick Start -Classify, extract, and score sentiment from a text column in a single query: +Classify, extract, and score sentiment from a text column in a single query (v2 syntax): ```sql SELECT ticket_id, ticket_text, - ai_classify(ticket_text, ARRAY('urgent', 'not urgent', 'spam')) AS priority, - ai_extract(ticket_text, ARRAY('product', 'error_code', 'date')) AS entities, - ai_analyze_sentiment(ticket_text) AS sentiment + ai_classify(ticket_text, '["urgent", "not urgent", "spam"]'):response[0] AS priority, + ai_extract(ticket_text, '["product", "error_code", "date"]'):response AS entities, + ai_analyze_sentiment(ticket_text) AS sentiment FROM support_tickets; ``` +> **V2 returns VARIANT** — access fields with `:response.field` (SQL) or `["response"]["field"]` (Python). V1 syntax (`ARRAY(...)` argument) still works but returns STRUCT with dot notation. + ```python from pyspark.sql.functions import expr df = spark.table("support_tickets") df = ( - df.withColumn("priority", expr("ai_classify(ticket_text, array('urgent', 'not urgent', 'spam'))")) - .withColumn("entities", expr("ai_extract(ticket_text, array('product', 'error_code', 'date'))")) + df.withColumn("priority", expr("ai_classify(ticket_text, '[\"urgent\", \"not urgent\", \"spam\"]'):response[0]")) + .withColumn("entities", expr("ai_extract(ticket_text, '[\"product\", \"error_code\", \"date\"]')")) .withColumn("sentiment", expr("ai_analyze_sentiment(ticket_text)")) ) -# Access nested STRUCT fields from ai_extract +# Access VARIANT fields from ai_extract v2 df.select("ticket_id", "priority", "sentiment", - "entities.product", "entities.error_code", "entities.date").display() + "entities:response.product", "entities:response.error_code", "entities:response.date").display() ``` ## Common Patterns @@ -86,7 +88,7 @@ SELECT ai_analyze_sentiment(content) AS sentiment, ai_summarize(content, 30) AS summary, ai_classify(content, - ARRAY('technical', 'billing', 'other')) AS category, + '["technical", "billing", "other"]'):response[0] AS category, ai_fix_grammar(content) AS content_clean FROM raw_feedback; ``` @@ -122,10 +124,49 @@ df = ( "parsed:error AS parse_error") .filter("parse_error IS NULL") .withColumn("summary", expr("ai_summarize(text_blocks, 50)")) - .withColumn("entities", expr("ai_extract(text_blocks, array('date', 'amount', 'vendor'))")) + .withColumn("entities", expr("ai_extract(text_blocks, '[\"date\", \"amount\", \"vendor\"]')")) ) ``` +### Pattern 3b: V2 Composable Chaining — VARIANT Flows Directly Between Functions + +V2 `ai_classify` and `ai_extract` accept VARIANT input, so the output of `ai_parse_document` can flow directly into them without extracting text first. This avoids the intermediate `selectExpr` step and lets the functions operate on the full document structure: + +```python +from pyspark.sql.functions import expr + +df = ( + spark.read.format("binaryFile") + .load("/Volumes/catalog/schema/landing/documents/") + # Stage 1: parse — returns VARIANT + .withColumn("parsed", expr("ai_parse_document(content)")) + # Stage 2: classify — accepts VARIANT directly from ai_parse_document + .withColumn("doc_type", expr(""" + ai_classify(parsed, '["invoice", "contract", "report", "other"]'):response[0] + """)) + # Stage 3: extract — accepts VARIANT directly from ai_parse_document + .withColumn("entities", expr(""" + ai_extract(parsed, '{ + "type": "object", + "properties": { + "date": {"type": "string"}, + "amount": {"type": "number"}, + "vendor": {"type": "string"} + } + }') + """)) + .select( + "path", + "doc_type", + "entities:response.date", + "entities:response.amount", + "entities:response.vendor", + ) +) +``` + +> This chaining pattern is only possible with v2 syntax. V1 `ai_classify` and `ai_extract` require STRING input and cannot accept the VARIANT output from `ai_parse_document`. + ### Pattern 4: Semantic Matching / Deduplication ```sql @@ -136,9 +177,9 @@ JOIN companies b ON a.id < b.id WHERE ai_similarity(a.name, b.name) > 0.85; ``` -### Pattern 5: Complex JSON Extraction with `ai_query` (last resort) +### Pattern 5: Complex JSON Extraction with `ai_query` (when `ai_extract` v2 limits are exceeded) -Use only when the output schema has nested arrays or requires multi-step reasoning that no task-specific function handles: +Use when extraction exceeds 128 fields or 7 nesting levels, requires a custom model endpoint, or involves multimodal input. For most nested extraction including line-item arrays, prefer `ai_extract` v2 with a JSON schema — see [1-task-functions.md](1-task-functions.md#ai_extract). ```python from pyspark.sql.functions import expr, from_json, col @@ -189,7 +230,10 @@ FROM ai_forecast( | `ai_forecast` fails | Requires **Pro or Serverless** SQL warehouse — not available on Classic or Starter. | | All functions return NULL | Input column is NULL. Filter with `WHERE col IS NOT NULL` before calling. | | `ai_translate` fails for a language | Supported: English, German, French, Italian, Portuguese, Hindi, Spanish, Thai. Use `ai_query` with a multilingual model for others. | -| `ai_classify` returns unexpected labels | Use clear, mutually exclusive label names. Fewer labels (2–5) produces more reliable results. | +| `ai_classify` returns unexpected labels | Use clear, mutually exclusive label names. Fewer labels (2–5) is optimal for single-label. V2 supports up to 500 labels and label descriptions (`'{"label":"description"}'` format) for better disambiguation. | +| `ai_extract` v2 returns VARIANT, not STRUCT | V2 uses `:response.field` path notation. V1 `ARRAY(...)` syntax still returns STRUCT with dot notation. Use JSON string schema to get v2 behavior. | +| `ai_classify` v2 returns array in response | V2 returns `{"response": ["label"], ...}`. Access with `:response[0]` for single-label, or iterate `:response` for multi-label. | +| Need multi-label classification | Use `ai_classify` v2 with `map('multilabel', 'true')` in options. Returns all applicable labels in the response array. | | `ai_query` raises on some rows in a batch job | Add `failOnError => false` — returns a STRUCT with `.response` and `.error` instead of raising. | | Batch job runs slowly | Use DBR **15.4 ML LTS** cluster (not serverless or interactive) for optimized batch inference throughput. | | Want to swap models without editing pipeline code | Store all model names and prompts in `config.yml` — see [4-document-processing-pipeline.md](4-document-processing-pipeline.md) for the pattern. |