diff --git a/docs/README.instructions.md b/docs/README.instructions.md index 0590fe1ff..9a6717393 100644 --- a/docs/README.instructions.md +++ b/docs/README.instructions.md @@ -175,6 +175,7 @@ See [CONTRIBUTING.md](../CONTRIBUTING.md#adding-instructions) for guidelines on | [Ruby on Rails](../instructions/ruby-on-rails.instructions.md)
[![Install in VS Code](https://img.shields.io/badge/VS_Code-Install-0098FF?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md)
[![Install in VS Code Insiders](https://img.shields.io/badge/VS_Code_Insiders-Install-24bfa5?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fruby-on-rails.instructions.md) | Ruby on Rails coding conventions and guidelines | | [Rust Coding Conventions and Best Practices](../instructions/rust.instructions.md)
[![Install in VS Code](https://img.shields.io/badge/VS_Code-Install-0098FF?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md)
[![Install in VS Code Insiders](https://img.shields.io/badge/VS_Code_Insiders-Install-24bfa5?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust.instructions.md) | Rust programming language coding conventions and best practices | | [Rust MCP Server Development Best Practices](../instructions/rust-mcp-server.instructions.md)
[![Install in VS Code](https://img.shields.io/badge/VS_Code-Install-0098FF?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md)
[![Install in VS Code Insiders](https://img.shields.io/badge/VS_Code_Insiders-Install-24bfa5?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Frust-mcp-server.instructions.md) | Best practices for building Model Context Protocol servers in Rust using the official rmcp SDK with async/await patterns | +| [Scala + Apache Spark Best Practices](../instructions/scala-spark.instructions.md)
[![Install in VS Code](https://img.shields.io/badge/VS_Code-Install-0098FF?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala-spark.instructions.md)
[![Install in VS Code Insiders](https://img.shields.io/badge/VS_Code_Insiders-Install-24bfa5?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala-spark.instructions.md) | Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns. | | [Scala Best Practices](../instructions/scala2.instructions.md)
[![Install in VS Code](https://img.shields.io/badge/VS_Code-Install-0098FF?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md)
[![Install in VS Code Insiders](https://img.shields.io/badge/VS_Code_Insiders-Install-24bfa5?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fscala2.instructions.md) | Scala 2.12/2.13 programming language coding conventions and best practices following Databricks style guide for functional programming, type safety, and production code quality. | | [Security Standards](../instructions/security-and-owasp.instructions.md)
[![Install in VS Code](https://img.shields.io/badge/VS_Code-Install-0098FF?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md)
[![Install in VS Code Insiders](https://img.shields.io/badge/VS_Code_Insiders-Install-24bfa5?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fsecurity-and-owasp.instructions.md) | Comprehensive secure coding standards based on OWASP Top 10 2025, with 55+ anti-patterns, detection regex, framework-specific fixes for modern web and backend frameworks, and AI/LLM security guidance. | | [Self-explanatory Code Commenting Instructions](../instructions/self-explanatory-code-commenting.instructions.md)
[![Install in VS Code](https://img.shields.io/badge/VS_Code-Install-0098FF?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md)
[![Install in VS Code Insiders](https://img.shields.io/badge/VS_Code_Insiders-Install-24bfa5?style=flat-square&logo=visualstudiocode&logoColor=white)](https://aka.ms/awesome-copilot/install/instructions?url=vscode-insiders%3Achat-instructions%2Finstall%3Furl%3Dhttps%3A%2F%2Fraw.githubusercontent.com%2Fgithub%2Fawesome-copilot%2Fmain%2Finstructions%2Fself-explanatory-code-commenting.instructions.md) | Guidelines for GitHub Copilot to write comments to achieve self-explanatory code with less comments. Examples are in JavaScript but it should work on any language that has comments. | diff --git a/instructions/scala-spark.instructions.md b/instructions/scala-spark.instructions.md new file mode 100644 index 000000000..3245174cb --- /dev/null +++ b/instructions/scala-spark.instructions.md @@ -0,0 +1,531 @@ +--- +description: 'Best practices for building Apache Spark applications in Scala, covering DataFrames, Datasets, SparkSQL, performance tuning, testing, and production deployment patterns.' +applyTo: '**/*.scala, **/build.sbt, **/build.sc' +--- + +# Scala + Apache Spark Best Practices + +Guidelines for writing efficient, maintainable, and production-ready Apache Spark applications in Scala. + +## Dependencies + +### SBT + +```scala +val sparkVersion = "3.5.1" + +libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-core" % sparkVersion % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", + "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided" +) +``` + +### Maven + +```xml + + 3.5.1 + 2.13 + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-mllib_${scala.binary.version} + ${spark.version} + provided + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${spark.version} + provided + + +``` + +Mark Spark dependencies as `"provided"` since the cluster supplies them at runtime. Only bundle application-specific libraries in the fat JAR. + +## SparkSession Setup + +Always use `SparkSession` as the single entry point: + +```scala +import org.apache.spark.sql.SparkSession + +val spark: SparkSession = SparkSession.builder() + .appName("MyApplication") + .config("spark.sql.shuffle.partitions", "200") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() + +import spark.implicits._ +``` + +- Do **not** create multiple `SparkSession` instances in the same JVM. +- Avoid hardcoding `master` in application code; set it at submit time via `--master`. + +## DataFrames vs Datasets vs RDDs + +Prefer the **DataFrame API** (untyped `Dataset[Row]`) for most workloads. Use **Datasets** (typed) when compile-time type safety justifies the serialization overhead. Avoid raw **RDDs** unless you need low-level control. + +```scala +import org.apache.spark.sql.{DataFrame, Dataset} + +// Preferred — DataFrame API +val df: DataFrame = spark.read.parquet("data/events") +val result = df + .filter($"status" === "active") + .groupBy($"region") + .agg(count("*").as("total")) + +// Typed Dataset — use when schema safety matters +case class Event(id: Long, status: String, region: String) +val ds: Dataset[Event] = df.as[Event] +val active = ds.filter(_.status == "active") +``` + +## Schema Management + +Always define schemas explicitly when reading semi-structured data instead of relying on schema inference: + +```scala +import org.apache.spark.sql.types._ + +val schema = StructType(Seq( + StructField("id", LongType, nullable = false), + StructField("name", StringType, nullable = true), + StructField("timestamp", TimestampType, nullable = false), + StructField("amount", DecimalType(18, 2), nullable = true), + StructField("tags", ArrayType(StringType), nullable = true) +)) + +val df = spark.read + .schema(schema) + .json("data/events/*.json") +``` + +- Schema inference (`inferSchema=true`) reads the entire data source and is expensive for large files. +- For Parquet and Delta, the schema is embedded — explicit definition is unnecessary. + +## Column Expressions + +Prefer `col()` or `$""` over string column names in transformations for early error detection: + +```scala +import org.apache.spark.sql.functions._ + +// Good — type-checked column references +df.select(col("name"), $"amount" * 1.1 as "adjusted_amount") + +// Avoid — string-only references delay errors to runtime +df.select("name", "amount") +``` + +## Joins + +### Broadcast Joins + +Broadcast the smaller side of a join when it fits in executor memory (typically < 100 MB): + +```scala +import org.apache.spark.sql.functions.broadcast + +val enriched = largeDF.join( + broadcast(smallLookupDF), + Seq("key"), + "left" +) +``` + +### Avoiding Cartesian Products + +Never use cross joins unless intentional. Enable the safeguard: + +```scala +spark.conf.set("spark.sql.crossJoin.enabled", "false") +``` + +### Skew Handling + +For joins on skewed keys, salt the key to distribute load: + +```scala +import org.apache.spark.sql.functions._ + +val saltBuckets = 10 +val saltedLeft = leftDF.withColumn("salt", (rand() * saltBuckets).cast("int")) +val saltedRight = rightDF + .crossJoin((0 until saltBuckets).toDF("salt")) + +val result = saltedLeft + .join(saltedRight, Seq("join_key", "salt")) + .drop("salt") +``` + +The tradeoff is that the right side grows by 10×, so this only works when the right side is reasonably small or the skew is severe enough to justify it. For Spark 3.x+, AQE's built-in skew join handling (`spark.sql.adaptive.skewJoin.enabled = true`) can do this automatically without manual salting. + +## Partitioning and Bucketing + +### Write Partitioning + +Partition output by high-cardinality filter columns (e.g., date): + +```scala +df.write + .partitionBy("year", "month") + .mode("overwrite") + .parquet("output/events") +``` + +- Avoid partitioning on high-cardinality columns (e.g., user ID) which creates millions of small files. + +### Shuffle Partitions + +Tune `spark.sql.shuffle.partitions` based on data volume: + +```scala +// Default is 200; adjust based on data size +// Rule of thumb: target 128 MB per partition +spark.conf.set("spark.sql.shuffle.partitions", "400") +``` + +### Repartition vs Coalesce + +```scala +// Repartition — full shuffle, use to increase or evenly distribute partitions +df.repartition(100, $"key") + +// Coalesce — no shuffle, use only to reduce partition count +df.coalesce(10) +``` + +Never use `coalesce(1)` on large datasets — it forces all data through a single task. + +## Caching and Persistence + +Cache only when a DataFrame is reused multiple times: + +```scala +import org.apache.spark.storage.StorageLevel + +val cached = expensiveDF.persist(StorageLevel.MEMORY_AND_DISK) +cached.count() // materialize the cache + +// Use cached DF multiple times +val summary = cached.groupBy("region").count() +val filtered = cached.filter($"amount" > 1000) + +// Always unpersist when done +cached.unpersist() +``` + +- Prefer `MEMORY_AND_DISK` over `MEMORY_ONLY` to avoid recomputation on eviction. +- Never cache DataFrames that are only used once. + +## UDFs — Use Sparingly + +Prefer built-in Spark SQL functions over UDFs. UDFs disable Catalyst optimizations and require serialization: + +```scala +import org.apache.spark.sql.functions._ + +// Good — use built-in functions +df.withColumn("upper_name", upper($"name")) + .withColumn("name_length", length($"name")) + +// Avoid — UDF for something built-in functions handle +val upperUdf = udf((s: String) => s.toUpperCase) +df.withColumn("upper_name", upperUdf($"name")) +``` + +When a UDF is unavoidable, prefer `spark.udf.register` for SparkSQL compatibility, and handle nulls explicitly: + +```scala +val parseStatus = udf((raw: String) => { + Option(raw).map(_.trim.toLowerCase) match { + case Some("active") | Some("enabled") => "ACTIVE" + case Some("inactive") | Some("disabled") => "INACTIVE" + case _ => "UNKNOWN" + } +}) +``` + +## Window Functions + +Use window functions for ranking, running totals, and lag/lead calculations: + +```scala +import org.apache.spark.sql.expressions.Window + +val windowSpec = Window + .partitionBy("department") + .orderBy($"salary".desc) + +val ranked = df + .withColumn("rank", rank().over(windowSpec)) + .withColumn("dense_rank", dense_rank().over(windowSpec)) + .withColumn("row_number", row_number().over(windowSpec)) + .withColumn("running_total", sum($"salary").over( + Window.partitionBy("department").orderBy("hire_date") + .rowsBetween(Window.unboundedPreceding, Window.currentRow) + )) +``` + +## Error Handling + +### Corrupt Record Handling + +```scala +val df = spark.read + .option("mode", "PERMISSIVE") // default: keeps corrupt rows + .option("columnNameOfCorruptRecord", "_corrupt_record") + .schema(schema) + .json("data/events") + +val clean = df.filter($"_corrupt_record".isNull).drop("_corrupt_record") +val bad = df.filter($"_corrupt_record".isNotNull) +bad.write.json("data/quarantine") +``` + +### Accumulator-Based Error Counting + +```scala +val parseErrors = spark.sparkContext.longAccumulator("parseErrors") + +val parsed = df.map { row => + try { + parseRow(row) + } catch { + case _: Exception => + parseErrors.add(1) + null + } +}.filter(_ != null) + +println(s"Parse errors: ${parseErrors.value}") +``` + +> **Caveat:** Accumulators are only guaranteed accurate inside actions (`count`, `collect`, `write`). If tasks are retried due to failures, accumulators can over-count. For exact error tracking, prefer the quarantine pattern above; use accumulators for operational monitoring only. + +## Streaming (Structured Streaming) + +```scala +val stream = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", "broker:9092") + .option("subscribe", "events") + .option("startingOffsets", "latest") + .load() + +val parsed = stream + .selectExpr("CAST(value AS STRING) as json") + .select(from_json($"json", schema).as("data")) + .select("data.*") + +val query = parsed.writeStream + .format("delta") + .option("checkpointLocation", "/checkpoints/events") + .outputMode("append") + .trigger(Trigger.ProcessingTime("30 seconds")) + .start("output/events") + +query.awaitTermination() +``` + +- Always set a checkpoint location for fault tolerance. +- Use `Trigger.ProcessingTime` or `Trigger.AvailableNow` — avoid `Trigger.Once` in production (use `AvailableNow` instead). + +## Delta Lake Integration + +```scala +import io.delta.tables.DeltaTable + +// Upsert / merge +val target = DeltaTable.forPath(spark, "data/customers") + +target.as("t") + .merge(updatesDF.as("s"), "t.id = s.id") + .whenMatched.updateAll() + .whenNotMatched.insertAll() + .execute() + +// Time travel +val yesterday = spark.read + .format("delta") + .option("timestampAsOf", "2025-01-15") + .load("data/customers") + +// Optimize and vacuum +target.optimize().executeCompaction() +target.vacuum(168) // retain 7 days +``` + +## Performance Tuning Checklist + +1. **Minimize shuffles** — use `broadcast` joins, pre-partition data, avoid unnecessary `groupBy`. +2. **Avoid `collect()` on large DataFrames** — it pulls all data to the driver. +3. **Prefer `explain(true)`** to inspect physical plans before running expensive jobs. +4. **Enable Adaptive Query Execution (AQE)**: + ```scala + spark.conf.set("spark.sql.adaptive.enabled", "true") + spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") + spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") + ``` +5. **Use columnar formats** (Parquet, Delta, ORC) over CSV/JSON for analytical workloads. +6. **Predicate pushdown** — filter early in the query plan; place filters before joins. +7. **Column pruning** — `select` only needed columns instead of `select("*")`. +8. **Avoid `distinct()` before `groupBy`** — the aggregation already deduplicates. + +## Testing + +### Unit Testing Transformations + +Test pure transformation functions without a SparkSession when possible: + +```scala +import org.scalatest.funsuite.AnyFunSuite + +class TransformationsTest extends AnyFunSuite { + test("parseStatus maps known values correctly") { + assert(parseStatus("active") == "ACTIVE") + assert(parseStatus("DISABLED") == "INACTIVE") + assert(parseStatus(null) == "UNKNOWN") + } +} +``` + +### Integration Testing with SparkSession + +Use a shared `SparkSession` for DataFrame-level tests: + +```scala +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite + +trait SparkTestBase extends AnyFunSuite with BeforeAndAfterAll { + lazy val spark: SparkSession = SparkSession.builder() + .master("local[2]") + .appName("test") + .config("spark.sql.shuffle.partitions", "2") + .getOrCreate() + + override def afterAll(): Unit = { + spark.stop() + super.afterAll() + } +} + +class EventPipelineTest extends SparkTestBase { + import spark.implicits._ + + test("pipeline filters inactive events") { + val input = Seq( + Event(1L, "active", "US"), + Event(2L, "inactive", "EU") + ).toDS() + + val result = filterActive(input) + assert(result.count() == 1) + assert(result.collect().head.status == "active") + } +} +``` + +## Application Packaging + +### Fat JAR with sbt-assembly + +```scala +// project/plugins.sbt +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.5") + +// build.sbt +assembly / assemblyMergeStrategy := { + case PathList("META-INF", _*) => MergeStrategy.discard + case _ => MergeStrategy.first +} +``` + +### Spark Submit + +```bash +spark-submit \ + --class com.example.MainApp \ + --master yarn \ + --deploy-mode cluster \ + --num-executors 10 \ + --executor-memory 8g \ + --executor-cores 4 \ + --conf spark.sql.adaptive.enabled=true \ + --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + target/scala-2.13/my-app-assembly-1.0.jar \ + --input s3://bucket/input \ + --output s3://bucket/output +``` + +## Common Anti-Patterns + +| Anti-Pattern | Why It's Bad | Fix | +|---|---|---| +| `collect()` on large data | OOM on driver | Use `take(n)`, `show()`, or write to storage | +| `count()` inside loops | Triggers full DAG evaluation each time | Cache and count once | +| UDF for built-in operations | Disables Catalyst optimizer | Use `org.apache.spark.sql.functions._` | +| `var` for DataFrames | Mutable references cause confusion | Chain transformations or use `val` | +| Schema inference on CSV/JSON | Reads entire source, fragile | Define `StructType` explicitly | +| `coalesce(1)` on large data | Single-task bottleneck | Use `repartition` with reasonable count | +| Nested `map` on RDDs | Quadratic complexity | Use `join` or `broadcast` | +| Ignoring data skew | Straggler tasks, OOM | Salt keys or use AQE skew handling | + +## Dynamic Allocation + +Enable dynamic allocation to let Spark scale executors up and down based on workload demand. This is essential for shared clusters where fixed executor counts waste resources during idle stages: + +```scala +spark.conf.set("spark.dynamicAllocation.enabled", "true") +spark.conf.set("spark.dynamicAllocation.minExecutors", "2") +spark.conf.set("spark.dynamicAllocation.maxExecutors", "50") +spark.conf.set("spark.dynamicAllocation.initialExecutors", "5") +spark.conf.set("spark.dynamicAllocation.executorIdleTimeout", "60s") +spark.conf.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1s") +``` + +Or via `spark-submit`: + +```bash +spark-submit \ + --conf spark.dynamicAllocation.enabled=true \ + --conf spark.dynamicAllocation.minExecutors=2 \ + --conf spark.dynamicAllocation.maxExecutors=50 \ + --conf spark.shuffle.service.enabled=true \ + ... +``` + +Key settings: + +| Setting | Purpose | +|---|---| +| `minExecutors` | Floor — always keep at least this many executors running | +| `maxExecutors` | Ceiling — cap to prevent monopolizing the cluster | +| `initialExecutors` | Starting count before auto-scaling kicks in | +| `executorIdleTimeout` | Remove idle executors after this duration (default 60s) | +| `schedulerBacklogTimeout` | Request new executors when tasks have been pending this long | + +- **Requires `spark.shuffle.service.enabled=true`** on YARN/Mesos — an external shuffle service preserves shuffle files after executors are removed. Without it, removed executors lose their shuffle data, forcing costly recomputation. +- On **Kubernetes**, use `spark.dynamicAllocation.shuffleTracking.enabled=true` instead (no external shuffle service needed). +- **Do not combine** `--num-executors` with dynamic allocation — they conflict. Remove `--num-executors` when enabling dynamic allocation.