diff --git a/docs/README.instructions.md b/docs/README.instructions.md
index 0590fe1ff..9a6717393 100644
--- a/docs/README.instructions.md
+++ b/docs/README.instructions.md
@@ -175,6 +175,7 @@ See [CONTRIBUTING.md](../CONTRIBUTING.md#adding-instructions) for guidelines on
| [Ruby on Rails](../instructions/ruby-on-rails.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md) | Ruby on Rails coding conventions and guidelines |
| [Rust Coding Conventions and Best Practices](../instructions/rust.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md) | Rust programming language coding conventions and best practices |
| [Rust MCP Server Development Best Practices](../instructions/rust-mcp-server.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md) | Best practices for building Model Context Protocol servers in Rust using the official rmcp SDK with async/await patterns |
+| [Scala + Apache Spark Best Practices](../instructions/scala-spark.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala-spark.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala-spark.instructions.md) | Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns. |
| [Scala Best Practices](../instructions/scala2.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md) | Scala 2.12/2.13 programming language coding conventions and best practices following Databricks style guide for functional programming, type safety, and production code quality. |
| [Security Standards](../instructions/security-and-owasp.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md) | Comprehensive secure coding standards based on OWASP Top 10 2025, with 55+ anti-patterns, detection regex, framework-specific fixes for modern web and backend frameworks, and AI/LLM security guidance. |
| [Self-explanatory Code Commenting Instructions](../instructions/self-explanatory-code-commenting.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md)
[](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md) | Guidelines for GitHub Copilot to write comments to achieve self-explanatory code with less comments. Examples are in JavaScript but it should work on any language that has comments. |
diff --git a/instructions/scala-spark.instructions.md b/instructions/scala-spark.instructions.md
new file mode 100644
index 000000000..3245174cb
--- /dev/null
+++ b/instructions/scala-spark.instructions.md
@@ -0,0 +1,531 @@
+---
+description: 'Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns.'
+applyTo: '**/*.scala, **/build.sbt, **/build.sc'
+---
+
+# Scala + Apache Spark Best Practices
+
+Guidelines for writing efficient, maintainable, and production-ready Apache Spark applications in Scala.
+
+## Dependencies
+
+### SBT
+
+```scala
+val sparkVersion = "3.5.1"
+
+libraryDependencies ++= Seq(
+ "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
+ "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
+ "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
+ "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided"
+)
+```
+
+### Maven
+
+```xml
+
+ 3.5.1
+ 2.13
+
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-mllib_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-streaming_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+```
+
+Mark Spark dependencies as `"provided"` since the cluster supplies them at runtime. Only bundle application-specific libraries in the fat JAR.
+
+## SparkSession Setup
+
+Always use `SparkSession` as the single entry point:
+
+```scala
+import org.apache.spark.sql.SparkSession
+
+val spark: SparkSession = SparkSession.builder()
+ .appName("MyApplication")
+ .config("spark.sql.shuffle.partitions", "200")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .getOrCreate()
+
+import spark.implicits._
+```
+
+- Do **not** create multiple `SparkSession` instances in the same JVM.
+- Avoid hardcoding `master` in application code; set it at submit time via `--master`.
+
+## DataFrames vs Datasets vs RDDs
+
+Prefer the **DataFrame API** (untyped `Dataset[Row]`) for most workloads. Use **Datasets** (typed) when compile-time type safety justifies the serialization overhead. Avoid raw **RDDs** unless you need low-level control.
+
+```scala
+import org.apache.spark.sql.{DataFrame, Dataset}
+
+// Preferred — DataFrame API
+val df: DataFrame = spark.read.parquet("data/events")
+val result = df
+ .filter($"status" === "active")
+ .groupBy($"region")
+ .agg(count("*").as("total"))
+
+// Typed Dataset — use when schema safety matters
+case class Event(id: Long, status: String, region: String)
+val ds: Dataset[Event] = df.as[Event]
+val active = ds.filter(_.status == "active")
+```
+
+## Schema Management
+
+Always define schemas explicitly when reading semi-structured data instead of relying on schema inference:
+
+```scala
+import org.apache.spark.sql.types._
+
+val schema = StructType(Seq(
+ StructField("id", LongType, nullable = false),
+ StructField("name", StringType, nullable = true),
+ StructField("timestamp", TimestampType, nullable = false),
+ StructField("amount", DecimalType(18, 2), nullable = true),
+ StructField("tags", ArrayType(StringType), nullable = true)
+))
+
+val df = spark.read
+ .schema(schema)
+ .json("data/events/*.json")
+```
+
+- Schema inference (`inferSchema=true`) reads the entire data source and is expensive for large files.
+- For Parquet and Delta, the schema is embedded — explicit definition is unnecessary.
+
+## Column Expressions
+
+Prefer `col()` or `$""` over string column names in transformations for early error detection:
+
+```scala
+import org.apache.spark.sql.functions._
+
+// Good — type-checked column references
+df.select(col("name"), $"amount" * 1.1 as "adjusted_amount")
+
+// Avoid — string-only references delay errors to runtime
+df.select("name", "amount")
+```
+
+## Joins
+
+### Broadcast Joins
+
+Broadcast the smaller side of a join when it fits in executor memory (typically < 100 MB):
+
+```scala
+import org.apache.spark.sql.functions.broadcast
+
+val enriched = largeDF.join(
+ broadcast(smallLookupDF),
+ Seq("key"),
+ "left"
+)
+```
+
+### Avoiding Cartesian Products
+
+Never use cross joins unless intentional. Enable the safeguard:
+
+```scala
+spark.conf.set("spark.sql.crossJoin.enabled", "false")
+```
+
+### Skew Handling
+
+For joins on skewed keys, salt the key to distribute load:
+
+```scala
+import org.apache.spark.sql.functions._
+
+val saltBuckets = 10
+val saltedLeft = leftDF.withColumn("salt", (rand() * saltBuckets).cast("int"))
+val saltedRight = rightDF
+ .crossJoin((0 until saltBuckets).toDF("salt"))
+
+val result = saltedLeft
+ .join(saltedRight, Seq("join_key", "salt"))
+ .drop("salt")
+```
+
+The tradeoff is that the right side grows by 10×, so this only works when the right side is reasonably small or the skew is severe enough to justify it. For Spark 3.x+, AQE's built-in skew join handling (`spark.sql.adaptive.skewJoin.enabled = true`) can do this automatically without manual salting.
+
+## Partitioning and Bucketing
+
+### Write Partitioning
+
+Partition output by high-cardinality filter columns (e.g., date):
+
+```scala
+df.write
+ .partitionBy("year", "month")
+ .mode("overwrite")
+ .parquet("output/events")
+```
+
+- Avoid partitioning on high-cardinality columns (e.g., user ID) which creates millions of small files.
+
+### Shuffle Partitions
+
+Tune `spark.sql.shuffle.partitions` based on data volume:
+
+```scala
+// Default is 200; adjust based on data size
+// Rule of thumb: target 128 MB per partition
+spark.conf.set("spark.sql.shuffle.partitions", "400")
+```
+
+### Repartition vs Coalesce
+
+```scala
+// Repartition — full shuffle, use to increase or evenly distribute partitions
+df.repartition(100, $"key")
+
+// Coalesce — no shuffle, use only to reduce partition count
+df.coalesce(10)
+```
+
+Never use `coalesce(1)` on large datasets — it forces all data through a single task.
+
+## Caching and Persistence
+
+Cache only when a DataFrame is reused multiple times:
+
+```scala
+import org.apache.spark.storage.StorageLevel
+
+val cached = expensiveDF.persist(StorageLevel.MEMORY_AND_DISK)
+cached.count() // materialize the cache
+
+// Use cached DF multiple times
+val summary = cached.groupBy("region").count()
+val filtered = cached.filter($"amount" > 1000)
+
+// Always unpersist when done
+cached.unpersist()
+```
+
+- Prefer `MEMORY_AND_DISK` over `MEMORY_ONLY` to avoid recomputation on eviction.
+- Never cache DataFrames that are only used once.
+
+## UDFs — Use Sparingly
+
+Prefer built-in Spark SQL functions over UDFs. UDFs disable Catalyst optimizations and require serialization:
+
+```scala
+import org.apache.spark.sql.functions._
+
+// Good — use built-in functions
+df.withColumn("upper_name", upper($"name"))
+ .withColumn("name_length", length($"name"))
+
+// Avoid — UDF for something built-in functions handle
+val upperUdf = udf((s: String) => s.toUpperCase)
+df.withColumn("upper_name", upperUdf($"name"))
+```
+
+When a UDF is unavoidable, prefer `spark.udf.register` for SparkSQL compatibility, and handle nulls explicitly:
+
+```scala
+val parseStatus = udf((raw: String) => {
+ Option(raw).map(_.trim.toLowerCase) match {
+ case Some("active") | Some("enabled") => "ACTIVE"
+ case Some("inactive") | Some("disabled") => "INACTIVE"
+ case _ => "UNKNOWN"
+ }
+})
+```
+
+## Window Functions
+
+Use window functions for ranking, running totals, and lag/lead calculations:
+
+```scala
+import org.apache.spark.sql.expressions.Window
+
+val windowSpec = Window
+ .partitionBy("department")
+ .orderBy($"salary".desc)
+
+val ranked = df
+ .withColumn("rank", rank().over(windowSpec))
+ .withColumn("dense_rank", dense_rank().over(windowSpec))
+ .withColumn("row_number", row_number().over(windowSpec))
+ .withColumn("running_total", sum($"salary").over(
+ Window.partitionBy("department").orderBy("hire_date")
+ .rowsBetween(Window.unboundedPreceding, Window.currentRow)
+ ))
+```
+
+## Error Handling
+
+### Corrupt Record Handling
+
+```scala
+val df = spark.read
+ .option("mode", "PERMISSIVE") // default: keeps corrupt rows
+ .option("columnNameOfCorruptRecord", "_corrupt_record")
+ .schema(schema)
+ .json("data/events")
+
+val clean = df.filter($"_corrupt_record".isNull).drop("_corrupt_record")
+val bad = df.filter($"_corrupt_record".isNotNull)
+bad.write.json("data/quarantine")
+```
+
+### Accumulator-Based Error Counting
+
+```scala
+val parseErrors = spark.sparkContext.longAccumulator("parseErrors")
+
+val parsed = df.map { row =>
+ try {
+ parseRow(row)
+ } catch {
+ case _: Exception =>
+ parseErrors.add(1)
+ null
+ }
+}.filter(_ != null)
+
+println(s"Parse errors: ${parseErrors.value}")
+```
+
+> **Caveat:** Accumulators are only guaranteed accurate inside actions (`count`, `collect`, `write`). If tasks are retried due to failures, accumulators can over-count. For exact error tracking, prefer the quarantine pattern above; use accumulators for operational monitoring only.
+
+## Streaming (Structured Streaming)
+
+```scala
+val stream = spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "broker:9092")
+ .option("subscribe", "events")
+ .option("startingOffsets", "latest")
+ .load()
+
+val parsed = stream
+ .selectExpr("CAST(value AS STRING) as json")
+ .select(from_json($"json", schema).as("data"))
+ .select("data.*")
+
+val query = parsed.writeStream
+ .format("delta")
+ .option("checkpointLocation", "/checkpoints/events")
+ .outputMode("append")
+ .trigger(Trigger.ProcessingTime("30 seconds"))
+ .start("output/events")
+
+query.awaitTermination()
+```
+
+- Always set a checkpoint location for fault tolerance.
+- Use `Trigger.ProcessingTime` or `Trigger.AvailableNow` — avoid `Trigger.Once` in production (use `AvailableNow` instead).
+
+## Delta Lake Integration
+
+```scala
+import io.delta.tables.DeltaTable
+
+// Upsert / merge
+val target = DeltaTable.forPath(spark, "data/customers")
+
+target.as("t")
+ .merge(updatesDF.as("s"), "t.id = s.id")
+ .whenMatched.updateAll()
+ .whenNotMatched.insertAll()
+ .execute()
+
+// Time travel
+val yesterday = spark.read
+ .format("delta")
+ .option("timestampAsOf", "2025-01-15")
+ .load("data/customers")
+
+// Optimize and vacuum
+target.optimize().executeCompaction()
+target.vacuum(168) // retain 7 days
+```
+
+## Performance Tuning Checklist
+
+1. **Minimize shuffles** — use `broadcast` joins, pre-partition data, avoid unnecessary `groupBy`.
+2. **Avoid `collect()` on large DataFrames** — it pulls all data to the driver.
+3. **Prefer `explain(true)`** to inspect physical plans before running expensive jobs.
+4. **Enable Adaptive Query Execution (AQE)**:
+ ```scala
+ spark.conf.set("spark.sql.adaptive.enabled", "true")
+ spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
+ spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
+ ```
+5. **Use columnar formats** (Parquet, Delta, ORC) over CSV/JSON for analytical workloads.
+6. **Predicate pushdown** — filter early in the query plan; place filters before joins.
+7. **Column pruning** — `select` only needed columns instead of `select("*")`.
+8. **Avoid `distinct()` before `groupBy`** — the aggregation already deduplicates.
+
+## Testing
+
+### Unit Testing Transformations
+
+Test pure transformation functions without a SparkSession when possible:
+
+```scala
+import org.scalatest.funsuite.AnyFunSuite
+
+class TransformationsTest extends AnyFunSuite {
+ test("parseStatus maps known values correctly") {
+ assert(parseStatus("active") == "ACTIVE")
+ assert(parseStatus("DISABLED") == "INACTIVE")
+ assert(parseStatus(null) == "UNKNOWN")
+ }
+}
+```
+
+### Integration Testing with SparkSession
+
+Use a shared `SparkSession` for DataFrame-level tests:
+
+```scala
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite
+
+trait SparkTestBase extends AnyFunSuite with BeforeAndAfterAll {
+ lazy val spark: SparkSession = SparkSession.builder()
+ .master("local[2]")
+ .appName("test")
+ .config("spark.sql.shuffle.partitions", "2")
+ .getOrCreate()
+
+ override def afterAll(): Unit = {
+ spark.stop()
+ super.afterAll()
+ }
+}
+
+class EventPipelineTest extends SparkTestBase {
+ import spark.implicits._
+
+ test("pipeline filters inactive events") {
+ val input = Seq(
+ Event(1L, "active", "US"),
+ Event(2L, "inactive", "EU")
+ ).toDS()
+
+ val result = filterActive(input)
+ assert(result.count() == 1)
+ assert(result.collect().head.status == "active")
+ }
+}
+```
+
+## Application Packaging
+
+### Fat JAR with sbt-assembly
+
+```scala
+// project/plugins.sbt
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5")
+
+// build.sbt
+assembly / assemblyMergeStrategy := {
+ case PathList("META-INF", _*) => MergeStrategy.discard
+ case _ => MergeStrategy.first
+}
+```
+
+### Spark Submit
+
+```bash
+spark-submit \
+ --class com.example.MainApp \
+ --master yarn \
+ --deploy-mode cluster \
+ --num-executors 10 \
+ --executor-memory 8g \
+ --executor-cores 4 \
+ --conf spark.sql.adaptive.enabled=true \
+ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+ target/scala-2.13/my-app-assembly-1.0.jar \
+ --input s3://bucket/input \
+ --output s3://bucket/output
+```
+
+## Common Anti-Patterns
+
+| Anti-Pattern | Why It's Bad | Fix |
+|---|---|---|
+| `collect()` on large data | OOM on driver | Use `take(n)`, `show()`, or write to storage |
+| `count()` inside loops | Triggers full DAG evaluation each time | Cache and count once |
+| UDF for built-in operations | Disables Catalyst optimizer | Use `org.apache.spark.sql.functions._` |
+| `var` for DataFrames | Mutable references cause confusion | Chain transformations or use `val` |
+| Schema inference on CSV/JSON | Reads entire source, fragile | Define `StructType` explicitly |
+| `coalesce(1)` on large data | Single-task bottleneck | Use `repartition` with reasonable count |
+| Nested `map` on RDDs | Quadratic complexity | Use `join` or `broadcast` |
+| Ignoring data skew | Straggler tasks, OOM | Salt keys or use AQE skew handling |
+
+## Dynamic Allocation
+
+Enable dynamic allocation to let Spark scale executors up and down based on workload demand. This is essential for shared clusters where fixed executor counts waste resources during idle stages:
+
+```scala
+spark.conf.set("spark.dynamicAllocation.enabled", "true")
+spark.conf.set("spark.dynamicAllocation.minExecutors", "2")
+spark.conf.set("spark.dynamicAllocation.maxExecutors", "50")
+spark.conf.set("spark.dynamicAllocation.initialExecutors", "5")
+spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s")
+spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
+```
+
+Or via `spark-submit`:
+
+```bash
+spark-submit \
+ --conf spark.dynamicAllocation.enabled=true \
+ --conf spark.dynamicAllocation.minExecutors=2 \
+ --conf spark.dynamicAllocation.maxExecutors=50 \
+ --conf spark.shuffle.service.enabled=true \
+ ...
+```
+
+Key settings:
+
+| Setting | Purpose |
+|---|---|
+| `minExecutors` | Floor — always keep at least this many executors running |
+| `maxExecutors` | Ceiling — cap to prevent monopolizing the cluster |
+| `initialExecutors` | Starting count before auto-scaling kicks in |
+| `executorIdleTimeout` | Remove idle executors after this duration (default 60s) |
+| `schedulerBacklogTimeout` | Request new executors when tasks have been pending this long |
+
+- **Requires `spark.shuffle.service.enabled=true`** on YARN/Mesos — an external shuffle service preserves shuffle files after executors are removed. Without it, removed executors lose their shuffle data, forcing costly recomputation.
+- On **Kubernetes**, use `spark.dynamicAllocation.shuffleTracking.enabled=true` instead (no external shuffle service needed).
+- **Do not combine** `--num-executors` with dynamic allocation — they conflict. Remove `--num-executors` when enabling dynamic allocation.