diff --git a/Directory.Packages.props b/Directory.Packages.props index 846804d5..2dc36e0b 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -19,8 +19,10 @@ + + diff --git a/cli/SimpleModule.Cli/Commands/Jobs/JobsListScheduledCommand.cs b/cli/SimpleModule.Cli/Commands/Jobs/JobsListScheduledCommand.cs new file mode 100644 index 00000000..523b160d --- /dev/null +++ b/cli/SimpleModule.Cli/Commands/Jobs/JobsListScheduledCommand.cs @@ -0,0 +1,184 @@ +using System.Data.Common; +using System.Globalization; +using System.Text.Json; +using Microsoft.Data.Sqlite; +using Npgsql; +using SimpleModule.Cli.Infrastructure; +using Spectre.Console; +using Spectre.Console.Cli; + +namespace SimpleModule.Cli.Commands.Jobs; + +public sealed class JobsListScheduledCommand : Command +{ + public override int Execute(CommandContext context, JobsListScheduledSettings settings) + { + var (connection, provider) = ResolveConnection(settings); + if (connection is null) + { + AnsiConsole.MarkupLine( + "[red]No database connection. Pass --connection or run from a project with appsettings.json.[/]" + ); + return 1; + } + + try + { + using var conn = OpenConnection(provider, connection); + var rows = ReadSchedules(conn).ToList(); + if (rows.Count == 0) + { + AnsiConsole.MarkupLine("[yellow]No scheduled jobs found.[/]"); + return 0; + } + + var table = new Table().RoundedBorder(); + table.AddColumn("Name"); + table.AddColumn("Job type"); + table.AddColumn("Cron"); + table.AddColumn("TZ"); + table.AddColumn("Next run"); + table.AddColumn("Last run"); + table.AddColumn("Flags"); + + foreach (var row in rows.OrderBy(r => r.NextRunAt ?? DateTimeOffset.MaxValue)) + { + var flags = new List(); + if (!row.IsEnabled) + flags.Add("[red]disabled[/]"); + if (row.WithoutOverlapping) + flags.Add("mutex"); + if (row.OnOneServer) + flags.Add("single"); + + table.AddRow( + Markup.Escape(row.Name), + Markup.Escape(ShortType(row.JobTypeName)), + Markup.Escape(row.CronExpression), + Markup.Escape(row.TimeZoneId), + FormatTimestamp(row.NextRunAt), + FormatTimestamp(row.LastRunAt), + string.Join(", ", flags) + ); + } + + AnsiConsole.Write(table); + AnsiConsole.MarkupLine($"\n[dim]{rows.Count} scheduled job(s)[/]"); + return 0; + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + AnsiConsole.MarkupLine($"[red]Failed to read scheduled jobs:[/] {Markup.Escape(ex.Message)}"); + return 1; + } + } + + private static (string? Connection, string Provider) ResolveConnection( + JobsListScheduledSettings settings + ) + { + if (!string.IsNullOrWhiteSpace(settings.ConnectionString)) + { + return (settings.ConnectionString, settings.Provider ?? GuessProvider(settings.ConnectionString)); + } + + var solution = SolutionContext.Discover(); + if (solution is null) + return (null, "Sqlite"); + + var path = Path.Combine(solution.RootPath, "template", "SimpleModule.Host", "appsettings.json"); + if (!File.Exists(path)) + { + var hostDir = Directory + .EnumerateFiles(solution.RootPath, "appsettings.json", SearchOption.AllDirectories) + .FirstOrDefault(); + if (hostDir is null) + return (null, "Sqlite"); + path = hostDir; + } + + using var doc = JsonDocument.Parse(File.ReadAllText(path)); + if (!doc.RootElement.TryGetProperty("Database", out var db)) + return (null, "Sqlite"); + + var conn = db.TryGetProperty("DefaultConnection", out var cs) ? cs.GetString() : null; + var provider = db.TryGetProperty("Provider", out var p) ? p.GetString() : "Sqlite"; + return (conn, provider ?? "Sqlite"); + } + + private static string GuessProvider(string connection) => + connection.Contains("Data Source", StringComparison.OrdinalIgnoreCase) ? "Sqlite" : "Postgres"; + + private static DbConnection OpenConnection(string provider, string connectionString) + { + DbConnection conn = provider.Equals("Postgres", StringComparison.OrdinalIgnoreCase) + ? new NpgsqlConnection(connectionString) + : new SqliteConnection(connectionString); + conn.Open(); + return conn; + } + + private static IEnumerable ReadSchedules(DbConnection conn) + { + using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + SELECT "Name", "JobTypeName", "CronExpression", "TimeZoneId", + "IsEnabled", "WithoutOverlapping", "OnOneServer", + "LastRunAt", "NextRunAt" + FROM "ScheduledJobStates" + """; + using var reader = cmd.ExecuteReader(); + while (reader.Read()) + { + yield return new ScheduleRow( + reader.GetString(0), + reader.GetString(1), + reader.GetString(2), + reader.GetString(3), + reader.GetBoolean(4), + reader.GetBoolean(5), + reader.GetBoolean(6), + ReadTimestamp(reader, 7), + ReadTimestamp(reader, 8) + ); + } + } + + private static DateTimeOffset? ReadTimestamp(DbDataReader reader, int ordinal) + { + if (reader.IsDBNull(ordinal)) + return null; + var raw = reader.GetValue(ordinal); + return raw switch + { + DateTimeOffset dto => dto, + DateTime dt => new DateTimeOffset(DateTime.SpecifyKind(dt, DateTimeKind.Utc)), + string s when DateTimeOffset.TryParse(s, CultureInfo.InvariantCulture, out var parsed) => parsed, + _ => null, + }; + } + + private static string FormatTimestamp(DateTimeOffset? value) => + value is null + ? "[dim]—[/]" + : Markup.Escape(value.Value.ToString("yyyy-MM-dd HH:mm:ss 'UTC'", CultureInfo.InvariantCulture)); + + // Mirrors BackgroundJobsInternalConstants.GetShortTypeName — kept inline because + // the CLI doesn't reference the BackgroundJobs assembly. + private static string ShortType(string assemblyQualifiedName) => + assemblyQualifiedName.Split(',')[0].Split('.').Last(); + + private sealed record ScheduleRow( + string Name, + string JobTypeName, + string CronExpression, + string TimeZoneId, + bool IsEnabled, + bool WithoutOverlapping, + bool OnOneServer, + DateTimeOffset? LastRunAt, + DateTimeOffset? NextRunAt + ); +} diff --git a/cli/SimpleModule.Cli/Commands/Jobs/JobsListScheduledSettings.cs b/cli/SimpleModule.Cli/Commands/Jobs/JobsListScheduledSettings.cs new file mode 100644 index 00000000..f4539e54 --- /dev/null +++ b/cli/SimpleModule.Cli/Commands/Jobs/JobsListScheduledSettings.cs @@ -0,0 +1,19 @@ +using System.ComponentModel; +using Spectre.Console.Cli; + +namespace SimpleModule.Cli.Commands.Jobs; + +public sealed class JobsListScheduledSettings : CommandSettings +{ + [Description( + "Override the database connection string (e.g. \"Data Source=app.db\" or " + + "\"Host=localhost;Database=app;Username=...;Password=...\"). " + + "Defaults to Database:DefaultConnection from appsettings*.json." + )] + [CommandOption("-c|--connection ")] + public string? ConnectionString { get; set; } + + [Description("Database provider when --connection is supplied: Sqlite or Postgres.")] + [CommandOption("-p|--provider ")] + public string? Provider { get; set; } +} diff --git a/cli/SimpleModule.Cli/Program.cs b/cli/SimpleModule.Cli/Program.cs index 3947d3ea..00b5d7c0 100644 --- a/cli/SimpleModule.Cli/Program.cs +++ b/cli/SimpleModule.Cli/Program.cs @@ -1,6 +1,7 @@ using SimpleModule.Cli.Commands.Dev; using SimpleModule.Cli.Commands.Doctor; using SimpleModule.Cli.Commands.Install; +using SimpleModule.Cli.Commands.Jobs; using SimpleModule.Cli.Commands.List; using SimpleModule.Cli.Commands.Maintenance; using SimpleModule.Cli.Commands.New; @@ -62,6 +63,18 @@ .AddCommand("doctor") .WithDescription("Validate project structure and conventions"); + config.AddBranch( + "jobs", + jobsBranch => + { + jobsBranch.SetDescription("Inspect background-job state"); + jobsBranch + .AddCommand("list-scheduled") + .WithDescription("List declarative scheduled jobs with next/last run times") + .WithExample("jobs", "list-scheduled"); + } + ); + config.AddBranch( "skill", skillBranch => diff --git a/cli/SimpleModule.Cli/SimpleModule.Cli.csproj b/cli/SimpleModule.Cli/SimpleModule.Cli.csproj index ff873fee..bc3e3020 100644 --- a/cli/SimpleModule.Cli/SimpleModule.Cli.csproj +++ b/cli/SimpleModule.Cli/SimpleModule.Cli.csproj @@ -9,6 +9,8 @@ + + diff --git a/docs/scheduler.md b/docs/scheduler.md new file mode 100644 index 00000000..24e14720 --- /dev/null +++ b/docs/scheduler.md @@ -0,0 +1,140 @@ +# Task Scheduler + +The `BackgroundJobs` module exposes a fluent, code-declared scheduler so any +module can register recurring work at startup. Definitions live in C#, are +reconciled to the database at every tick, and end up as ordinary +`JobQueueEntry` rows that the worker picks up like any other job. + +The scheduler is **declarative** — it complements (but does not replace) the +runtime API on `IBackgroundJobs` for ad-hoc recurring jobs created from +endpoints. + +## Quick start + +In your module's `ConfigureServices`, call `AddScheduledJobs`: + +```csharp +using SimpleModule.BackgroundJobs.Contracts; + +public class AuditLogsModule : IModule +{ + public void ConfigureServices(IServiceCollection services, IConfiguration configuration) + { + services.AddModuleJob(); + services.AddModuleJob(); + + services.AddScheduledJobs(scheduler => + { + scheduler.Job() + .DailyAt("02:00") + .Timezone("UTC") + .WithoutOverlapping(); + + scheduler.Job("daily-digest") + .Cron("0 8 * * MON-FRI") + .Timezone("America/New_York") + .OnOneServer(); + }); + } +} +``` + +Each `scheduler.Job()` registers a single declarative schedule. The job type +must implement `IModuleJob` and be registered with `AddModuleJob()` so the +worker can resolve it from the DI container. + +## DSL reference + +| Method | Cron produced | Notes | +|---|---|---| +| `Cron("0 8 * * MON-FRI")` | as given | Standard 5-field cron; 6-field with seconds also supported | +| `EveryMinutes(15)` | `*/15 * * * *` | 1–59 only | +| `Hourly()` | `0 * * * *` | Top of each hour | +| `Daily()` | `0 0 * * *` | Midnight | +| `DailyAt("13:45")` | `45 13 * * *` | 24h `HH:mm` | +| `Weekdays()` | `0 0 * * MON-FRI` | Midnight Mon–Fri | +| `Timezone("UTC")` | — | IANA timezone for cron evaluation (default `UTC`) | +| `WithoutOverlapping()` | — | Skip if the prior run is still in-flight | +| `OnOneServer()` | — | Single elected host enqueues when many run the scheduler | +| `WithPayload(obj)` | — | Serialised to JSON for `IJobExecutionContext.Data` | + +## Semantics + +### Reconciliation + +Every tick (default 30s — configurable via `BackgroundJobs:Scheduler:TickInterval`) +the scheduler upserts each in-memory definition into the `ScheduledJobStates` +table. Changing cron/timezone in code is applied on the next tick; `LastRunAt` +and `NextRunAt` are preserved across reconciles unless the cron actually +changed. + +A definition removed from code does **not** automatically delete its row — the +row keeps its `IsEnabled` value, the scheduler ignores rows whose definitions +are no longer in the registry, and the worker won't enqueue them again. Manually +truncate the table if you need to purge orphans. + +### `WithoutOverlapping()` + +Backed by a per-job mutex row in `JobMutexes`. Before enqueueing, the scheduler +calls `IJobMutex.TryAcquireAsync(name)`; if the mutex is held by an in-flight +run, the tick *skips* enqueueing (and advances `NextRunAt`). When the worker +finishes (success or failure), it releases the mutex so the next tick can +re-enqueue. + +Mutex TTL defaults to 1 hour; configure via `BackgroundJobs:Scheduler:MutexTtl`. +A stuck worker can't block a schedule forever — the TTL provides the safety +net. + +### `OnOneServer()` + +Backed by a single `scheduler` lease row in `JobLeases`. On every tick a host +tries to acquire the lease; only the holder runs the rest of the tick. Lease +TTL defaults to 1 minute (≈2× tick interval); set via +`BackgroundJobs:Scheduler:LeaseTtl`. + +If a definition has `OnOneServer()` *anywhere* in the registry, the lease is +required for that host's tick. Hosts without the lease skip silently. + +### Failure isolation + +A definition with a bad cron expression is logged at `Error` and the tick +continues with the rest. The bad definition's `NextRunAt` stays `null` and no +work is enqueued for it until the cron is fixed. + +## Inspecting state + +The host writes `ScheduledJobStates`, `JobMutexes`, and `JobLeases` to the +database configured by `Database:DefaultConnection`. From the CLI: + +```bash +sm jobs list-scheduled # uses appsettings.json +sm jobs list-scheduled --connection "Data Source=app.db" --provider Sqlite +sm jobs list-scheduled --connection "Host=...;Database=..." --provider Postgres +``` + +The command prints Name, Job type, Cron, TZ, Next run, Last run, and flags +(`mutex`, `single`, `disabled`). + +## Worker mode + +The hosted `SchedulerService` is registered as `IHostedService` only when the +module is in `Consumer` mode (the same condition that registers +`JobProcessorService`). Producer-only hosts still register `IScheduler` so +`AddScheduledJobs` succeeds — but no ticks run there. + +## Configuration + +```jsonc +{ + "BackgroundJobs": { + "Scheduler": { + "TickInterval": "00:00:30", + "LeaseTtl": "00:01:00", + "MutexTtl": "01:00:00" + } + } +} +``` + +Sensible defaults are baked in; override only when tuning for a specific +deployment. diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/BackgroundJobsServiceExtensions.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/BackgroundJobsServiceExtensions.cs index 53f7183d..eae54fef 100644 --- a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/BackgroundJobsServiceExtensions.cs +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/BackgroundJobsServiceExtensions.cs @@ -1,4 +1,6 @@ using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using SimpleModule.Core; namespace SimpleModule.BackgroundJobs.Contracts; @@ -11,8 +13,53 @@ public static IServiceCollection AddModuleJob(this IServiceCollection serv services.AddSingleton(new ModuleJobRegistration(typeof(TJob))); return services; } + + /// + /// Register declarative scheduled jobs. Safe to call from multiple modules: + /// the first call creates the shared ; subsequent + /// calls reuse the same instance. Each registered job type is auto-registered + /// as a so callers don't need a separate + /// call. + /// + public static IServiceCollection AddScheduledJobs( + this IServiceCollection services, + Action configure + ) + { + ArgumentNullException.ThrowIfNull(configure); + + var descriptor = services.FirstOrDefault(s => + s.ServiceType == typeof(IScheduler) && s.ImplementationInstance is SchedulerRegistry + ); + + SchedulerRegistry registry; + if (descriptor?.ImplementationInstance is SchedulerRegistry existing) + { + registry = existing; + } + else + { + registry = new SchedulerRegistry(); + services.RemoveAll(); + services.AddSingleton(registry); + } + + var before = registry.Definitions.Select(d => d.JobType).ToHashSet(); + configure(registry); + + foreach (var def in registry.Definitions) + { + if (before.Contains(def.JobType)) + continue; + services.TryAddScoped(def.JobType); + services.AddSingleton(new ModuleJobRegistration(def.JobType)); + } + + return services; + } } +[NoDtoGeneration] public class ModuleJobRegistration { public ModuleJobRegistration(Type jobType) diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/IScheduledJob.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/IScheduledJob.cs new file mode 100644 index 00000000..521bfbca --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/IScheduledJob.cs @@ -0,0 +1,36 @@ +namespace SimpleModule.BackgroundJobs.Contracts; + +/// +/// Fluent builder returned by . All +/// methods mutate the underlying and return +/// the same builder so calls can chain. +/// +public interface IScheduledJob + where TJob : IModuleJob +{ + IScheduledJob Cron(string expression); + + IScheduledJob EveryMinutes(int minutes); + + IScheduledJob Hourly(); + + IScheduledJob Daily(); + + /// Run once a day at formatted as HH:mm. + IScheduledJob DailyAt(string time); + + /// Run at midnight Monday-Friday in the configured timezone. + IScheduledJob Weekdays(); + + /// IANA timezone id (e.g. UTC, America/New_York). + IScheduledJob Timezone(string tz); + + /// Skip the run if a previous invocation of the same job is still in-flight. + IScheduledJob WithoutOverlapping(); + + /// Run on a single host even when many hosts share the schedule. + IScheduledJob OnOneServer(); + + /// Attach a payload object that is serialised into . + IScheduledJob WithPayload(object payload); +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/IScheduler.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/IScheduler.cs new file mode 100644 index 00000000..3f65ebd0 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/IScheduler.cs @@ -0,0 +1,14 @@ +namespace SimpleModule.BackgroundJobs.Contracts; + +/// +/// Declarative scheduling surface for recurring jobs. Modules call this from +/// IModule.ConfigureServices to register schedules at startup time; +/// the hosted SchedulerService turns due definitions into queued jobs. +/// +public interface IScheduler +{ + IScheduledJob Job(string? name = null) + where TJob : IModuleJob; + + IReadOnlyList Definitions { get; } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/JobLease.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/JobLease.cs new file mode 100644 index 00000000..17215a0b --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/JobLease.cs @@ -0,0 +1,17 @@ +using SimpleModule.Core; + +namespace SimpleModule.BackgroundJobs.Contracts; + +/// +/// Single-leader lease row used by OnOneServer. Hosts call +/// IInstanceLeader.TryAcquireAsync; only the holder runs the tick. +/// +[NoDtoGeneration] +public class JobLease +{ + public string Name { get; set; } = string.Empty; + public string OwnerWorkerId { get; set; } = string.Empty; + public DateTimeOffset AcquiredAt { get; set; } + public DateTimeOffset ExpiresAt { get; set; } + public string ConcurrencyStamp { get; set; } = string.Empty; +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/JobMutex.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/JobMutex.cs new file mode 100644 index 00000000..1ebe7315 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/JobMutex.cs @@ -0,0 +1,17 @@ +using SimpleModule.Core; + +namespace SimpleModule.BackgroundJobs.Contracts; + +/// +/// Per-job mutex row supporting WithoutOverlapping. Acquired by the scheduler +/// before enqueueing and released by the worker after execution finishes. +/// +[NoDtoGeneration] +public class JobMutex +{ + public string Name { get; set; } = string.Empty; + public string OwnerWorkerId { get; set; } = string.Empty; + public DateTimeOffset AcquiredAt { get; set; } + public DateTimeOffset ExpiresAt { get; set; } + public string ConcurrencyStamp { get; set; } = string.Empty; +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobBuilder.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobBuilder.cs new file mode 100644 index 00000000..0b770022 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobBuilder.cs @@ -0,0 +1,97 @@ +using System.Globalization; + +namespace SimpleModule.BackgroundJobs.Contracts; + +/// +/// Default fluent builder backing . Each method +/// mutates the supplied and returns the same +/// instance. Cron parsing is intentionally not performed here so the Contracts +/// assembly stays free of the Cronos dependency; the scheduler validates and +/// reports per-definition errors at tick time. +/// +internal sealed class ScheduledJobBuilder(ScheduledJobDefinition definition) + : IScheduledJob + where TJob : IModuleJob +{ + public IScheduledJob Cron(string expression) + { + ArgumentException.ThrowIfNullOrWhiteSpace(expression); + definition.CronExpression = expression; + return this; + } + + public IScheduledJob EveryMinutes(int minutes) + { + if (minutes < 1 || minutes > 59) + { + throw new ArgumentOutOfRangeException( + nameof(minutes), + "EveryMinutes requires a value between 1 and 59." + ); + } + return Cron($"*/{minutes.ToString(CultureInfo.InvariantCulture)} * * * *"); + } + + public IScheduledJob Hourly() => Cron("0 * * * *"); + + public IScheduledJob Daily() => Cron("0 0 * * *"); + + public IScheduledJob DailyAt(string time) + { + ArgumentException.ThrowIfNullOrWhiteSpace(time); + if ( + !TimeOnly.TryParseExact( + time, + "HH:mm", + CultureInfo.InvariantCulture, + DateTimeStyles.None, + out var parsed + ) + ) + { + throw new ArgumentException( + $"DailyAt expects a 24h 'HH:mm' value, got '{time}'.", + nameof(time) + ); + } + return Cron( + $"{parsed.Minute.ToString(CultureInfo.InvariantCulture)} {parsed.Hour.ToString(CultureInfo.InvariantCulture)} * * *" + ); + } + + public IScheduledJob Weekdays() => Cron("0 0 * * MON-FRI"); + + public IScheduledJob Timezone(string tz) + { + ArgumentException.ThrowIfNullOrWhiteSpace(tz); + try + { + _ = TimeZoneInfo.FindSystemTimeZoneById(tz); + } + catch (TimeZoneNotFoundException ex) + { + throw new ArgumentException($"Unknown timezone '{tz}'.", nameof(tz), ex); + } + definition.TimeZoneId = tz; + return this; + } + + public IScheduledJob WithoutOverlapping() + { + definition.WithoutOverlapping = true; + return this; + } + + public IScheduledJob OnOneServer() + { + definition.OnOneServer = true; + return this; + } + + public IScheduledJob WithPayload(object payload) + { + ArgumentNullException.ThrowIfNull(payload); + definition.Payload = payload; + return this; + } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobDefinition.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobDefinition.cs new file mode 100644 index 00000000..14e897e3 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobDefinition.cs @@ -0,0 +1,20 @@ +using SimpleModule.Core; + +namespace SimpleModule.BackgroundJobs.Contracts; + +/// +/// In-memory description of a code-declared scheduled job. Populated by the +/// fluent builder and consumed by +/// SchedulerService at tick time. +/// +[NoDtoGeneration] +public sealed class ScheduledJobDefinition +{ + public string Name { get; set; } = string.Empty; + public Type JobType { get; set; } = null!; + public string CronExpression { get; set; } = string.Empty; + public string TimeZoneId { get; set; } = "UTC"; + public object? Payload { get; set; } + public bool WithoutOverlapping { get; set; } + public bool OnOneServer { get; set; } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobDto.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobDto.cs new file mode 100644 index 00000000..d298a1fd --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobDto.cs @@ -0,0 +1,17 @@ +using SimpleModule.Core; + +namespace SimpleModule.BackgroundJobs.Contracts; + +[Dto] +public class ScheduledJobDto +{ + public string Name { get; set; } = string.Empty; + public string JobType { get; set; } = string.Empty; + public string CronExpression { get; set; } = string.Empty; + public string TimeZoneId { get; set; } = "UTC"; + public bool WithoutOverlapping { get; set; } + public bool OnOneServer { get; set; } + public bool IsEnabled { get; set; } + public DateTimeOffset? LastRunAt { get; set; } + public DateTimeOffset? NextRunAt { get; set; } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobState.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobState.cs new file mode 100644 index 00000000..a9107169 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/ScheduledJobState.cs @@ -0,0 +1,27 @@ +using SimpleModule.Core; +using SimpleModule.Core.Entities; + +namespace SimpleModule.BackgroundJobs.Contracts; + +/// +/// Persisted state for a code-declared scheduled job. Keyed by ; +/// the scheduler service upserts on reconcile and bumps LastRunAt/NextRunAt +/// on every tick. +/// +[NoDtoGeneration] +public class ScheduledJobState : IHasCreationTime, IHasModificationTime, IHasConcurrencyStamp +{ + public string Name { get; set; } = string.Empty; + public string JobTypeName { get; set; } = string.Empty; + public string CronExpression { get; set; } = string.Empty; + public string TimeZoneId { get; set; } = "UTC"; + public string? Payload { get; set; } + public bool WithoutOverlapping { get; set; } + public bool OnOneServer { get; set; } + public bool IsEnabled { get; set; } = true; + public DateTimeOffset? LastRunAt { get; set; } + public DateTimeOffset? NextRunAt { get; set; } + public DateTimeOffset CreatedAt { get; set; } + public DateTimeOffset UpdatedAt { get; set; } + public string ConcurrencyStamp { get; set; } = string.Empty; +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/SchedulerRegistry.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/SchedulerRegistry.cs new file mode 100644 index 00000000..3fb33612 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/SchedulerRegistry.cs @@ -0,0 +1,51 @@ +using SimpleModule.Core; + +namespace SimpleModule.BackgroundJobs.Contracts; + +/// +/// Singleton implementation of . Modules invoke +/// services.AddScheduledJobs(scheduler => scheduler.Job<T>()...) during +/// ConfigureServices; the registry accumulates definitions which the +/// hosted SchedulerService reconciles against the database on each tick. +/// +[NoDtoGeneration] +public sealed class SchedulerRegistry : IScheduler +{ + private readonly List _definitions = []; + private readonly Lock _lock = new(); + + public IScheduledJob Job(string? name = null) + where TJob : IModuleJob + { + var jobType = typeof(TJob); + var resolvedName = string.IsNullOrWhiteSpace(name) ? jobType.FullName! : name; + + lock (_lock) + { + var existing = _definitions.FirstOrDefault(d => + string.Equals(d.Name, resolvedName, StringComparison.Ordinal) + ); + if (existing is not null) + { + throw new InvalidOperationException( + $"A scheduled job named '{resolvedName}' is already registered." + ); + } + + var definition = new ScheduledJobDefinition { Name = resolvedName, JobType = jobType }; + _definitions.Add(definition); + return new ScheduledJobBuilder(definition); + } + } + + public IReadOnlyList Definitions + { + get + { + lock (_lock) + { + return [.. _definitions]; + } + } + } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/BackgroundJobsDbContext.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/BackgroundJobsDbContext.cs index bbecf5aa..ca2aa0d9 100644 --- a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/BackgroundJobsDbContext.cs +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/BackgroundJobsDbContext.cs @@ -14,11 +14,17 @@ IOptions dbOptions { public DbSet JobProgress => Set(); public DbSet JobQueueEntries => Set(); + public DbSet ScheduledJobStates => Set(); + public DbSet JobMutexes => Set(); + public DbSet JobLeases => Set(); protected override void OnModelCreating(ModelBuilder modelBuilder) { modelBuilder.ApplyConfiguration(new JobProgressConfiguration()); modelBuilder.ApplyConfiguration(new JobQueueEntryConfiguration()); + modelBuilder.ApplyConfiguration(new ScheduledJobStateConfiguration()); + modelBuilder.ApplyConfiguration(new JobMutexConfiguration()); + modelBuilder.ApplyConfiguration(new JobLeaseConfiguration()); modelBuilder.ApplyModuleSchema(BackgroundJobsConstants.ModuleName, dbOptions.Value); } diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/BackgroundJobsModule.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/BackgroundJobsModule.cs index 527a31fb..ccfc731c 100644 --- a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/BackgroundJobsModule.cs +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/BackgroundJobsModule.cs @@ -3,9 +3,11 @@ using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; using SimpleModule.BackgroundJobs.Contracts; using SimpleModule.BackgroundJobs.Queue; +using SimpleModule.BackgroundJobs.Scheduler; using SimpleModule.BackgroundJobs.Services; using SimpleModule.BackgroundJobs.Worker; using SimpleModule.Core; @@ -46,6 +48,17 @@ public void ConfigureServices(IServiceCollection services, IConfiguration config services.AddScoped(); services.AddScoped(); + // Scheduler primitives — available in both producer and consumer hosts so + // modules can register schedules anywhere. + if (!services.Any(s => s.ServiceType == typeof(IScheduler))) + { + services.AddSingleton(new SchedulerRegistry()); + } + services.AddScoped(); + services.AddScoped(); + services.Configure(configuration.GetSection("BackgroundJobs:Scheduler")); + services.TryAddSingleton(TimeProvider.System); + // Progress flushing runs in whichever host owns the module — both producer and consumer. services.AddHostedService(); @@ -54,6 +67,7 @@ public void ConfigureServices(IServiceCollection services, IConfiguration config services.AddSingleton(WorkerIdentity.Create()); services.AddHostedService(); services.AddHostedService(); + services.AddHostedService(); } } diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/JobLeaseConfiguration.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/JobLeaseConfiguration.cs new file mode 100644 index 00000000..142b1c36 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/JobLeaseConfiguration.cs @@ -0,0 +1,19 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using SimpleModule.BackgroundJobs.Contracts; + +namespace SimpleModule.BackgroundJobs.EntityConfigurations; + +public class JobLeaseConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.ToTable("JobLeases"); + builder.HasKey(l => l.Name); + builder.Property(l => l.Name).HasMaxLength(200).IsRequired(); + builder.Property(l => l.OwnerWorkerId).HasMaxLength(100).IsRequired(); + builder.Property(l => l.AcquiredAt).IsRequired(); + builder.Property(l => l.ExpiresAt).IsRequired(); + builder.Property(l => l.ConcurrencyStamp).HasMaxLength(64).IsConcurrencyToken(); + } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/JobMutexConfiguration.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/JobMutexConfiguration.cs new file mode 100644 index 00000000..d4dc4ef6 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/JobMutexConfiguration.cs @@ -0,0 +1,19 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using SimpleModule.BackgroundJobs.Contracts; + +namespace SimpleModule.BackgroundJobs.EntityConfigurations; + +public class JobMutexConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.ToTable("JobMutexes"); + builder.HasKey(m => m.Name); + builder.Property(m => m.Name).HasMaxLength(200).IsRequired(); + builder.Property(m => m.OwnerWorkerId).HasMaxLength(100).IsRequired(); + builder.Property(m => m.AcquiredAt).IsRequired(); + builder.Property(m => m.ExpiresAt).IsRequired(); + builder.Property(m => m.ConcurrencyStamp).HasMaxLength(64).IsConcurrencyToken(); + } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/ScheduledJobStateConfiguration.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/ScheduledJobStateConfiguration.cs new file mode 100644 index 00000000..f5424e40 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/EntityConfigurations/ScheduledJobStateConfiguration.cs @@ -0,0 +1,31 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using SimpleModule.BackgroundJobs.Contracts; + +namespace SimpleModule.BackgroundJobs.EntityConfigurations; + +public class ScheduledJobStateConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.ToTable("ScheduledJobStates"); + builder.HasKey(s => s.Name); + builder.Property(s => s.Name).HasMaxLength(200).IsRequired(); + builder.Property(s => s.JobTypeName).HasMaxLength(500).IsRequired(); + builder.Property(s => s.CronExpression).HasMaxLength(100).IsRequired(); + builder.Property(s => s.TimeZoneId).HasMaxLength(100).IsRequired(); + builder.Property(s => s.Payload); + builder.Property(s => s.IsEnabled).IsRequired(); + builder.Property(s => s.WithoutOverlapping).IsRequired(); + builder.Property(s => s.OnOneServer).IsRequired(); + builder.Property(s => s.LastRunAt); + builder.Property(s => s.NextRunAt); + builder.Property(s => s.CreatedAt).IsRequired(); + builder.Property(s => s.UpdatedAt).IsRequired(); + builder.Property(s => s.ConcurrencyStamp).HasMaxLength(64); + + builder + .HasIndex(s => new { s.IsEnabled, s.NextRunAt }) + .HasDatabaseName("IX_ScheduledJobStates_Enabled_NextRunAt"); + } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/CronCalculator.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/CronCalculator.cs new file mode 100644 index 00000000..f6a44d61 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/CronCalculator.cs @@ -0,0 +1,31 @@ +using Cronos; + +namespace SimpleModule.BackgroundJobs.Scheduler; + +internal static class CronCalculator +{ + public static CronExpression Parse(string expression) + { + var format = + expression.Split(' ').Length > 5 ? CronFormat.IncludeSeconds : CronFormat.Standard; + return CronExpression.Parse(expression, format); + } + + public static DateTimeOffset? GetNextOccurrence( + string expression, + string timeZoneId, + DateTimeOffset fromInclusiveExclusive + ) + { + var cron = Parse(expression); + var tz = TimeZoneInfo.FindSystemTimeZoneById(timeZoneId); + return cron.GetNextOccurrence(fromInclusiveExclusive, tz, inclusive: false); + } + + /// UTC-only next-occurrence helper for the legacy recurring path. + public static DateTimeOffset? GetNextOccurrenceUtc(string expression, DateTime fromUtc) + { + var next = Parse(expression).GetNextOccurrence(fromUtc, inclusive: false); + return next is null ? null : new DateTimeOffset(next.Value, TimeSpan.Zero); + } +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/DatabaseInstanceLeader.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/DatabaseInstanceLeader.cs new file mode 100644 index 00000000..d82bcd91 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/DatabaseInstanceLeader.cs @@ -0,0 +1,79 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using SimpleModule.BackgroundJobs.Contracts; + +namespace SimpleModule.BackgroundJobs.Scheduler; + +internal sealed partial class DatabaseInstanceLeader( + BackgroundJobsDbContext db, + ILogger logger +) : IInstanceLeader +{ + public async Task TryAcquireAsync( + string name, + string ownerId, + TimeSpan ttl, + CancellationToken ct = default + ) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(ownerId); + + var now = DateTimeOffset.UtcNow; + var expires = now + ttl; + var existing = await db.JobLeases.FirstOrDefaultAsync(l => l.Name == name, ct); + + if (existing is null) + { + db.JobLeases.Add( + new JobLease + { + Name = name, + OwnerWorkerId = ownerId, + AcquiredAt = now, + ExpiresAt = expires, + ConcurrencyStamp = Guid.NewGuid().ToString("N"), + } + ); + try + { + await db.SaveChangesAsync(ct); + LogAcquired(logger, name, ownerId); + return true; + } + catch (DbUpdateException ex) + { + LogContended(logger, name, ex.Message); + return false; + } + } + + if (existing.ExpiresAt > now && !string.Equals(existing.OwnerWorkerId, ownerId, StringComparison.Ordinal)) + { + return false; + } + + existing.OwnerWorkerId = ownerId; + existing.AcquiredAt = now; + existing.ExpiresAt = expires; + existing.ConcurrencyStamp = Guid.NewGuid().ToString("N"); + + try + { + await db.SaveChangesAsync(ct); + LogAcquired(logger, name, ownerId); + return true; + } + catch (DbUpdateConcurrencyException) + { + LogContended(logger, name, "concurrency"); + return false; + } + } + + [LoggerMessage(Level = LogLevel.Debug, Message = "Lease '{Name}' acquired by {Owner}")] + private static partial void LogAcquired(ILogger logger, string name, string owner); + + [LoggerMessage(Level = LogLevel.Debug, Message = "Lease '{Name}' contended: {Reason}")] + private static partial void LogContended(ILogger logger, string name, string reason); +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/DatabaseJobMutex.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/DatabaseJobMutex.cs new file mode 100644 index 00000000..72835319 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/DatabaseJobMutex.cs @@ -0,0 +1,100 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using SimpleModule.BackgroundJobs.Contracts; + +namespace SimpleModule.BackgroundJobs.Scheduler; + +internal sealed partial class DatabaseJobMutex( + BackgroundJobsDbContext db, + ILogger logger +) : IJobMutex +{ + public async Task TryAcquireAsync( + string name, + string ownerId, + TimeSpan ttl, + CancellationToken ct = default + ) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentException.ThrowIfNullOrWhiteSpace(ownerId); + + var now = DateTimeOffset.UtcNow; + var expires = now + ttl; + var existing = await db.JobMutexes.FirstOrDefaultAsync(m => m.Name == name, ct); + + if (existing is null) + { + db.JobMutexes.Add( + new JobMutex + { + Name = name, + OwnerWorkerId = ownerId, + AcquiredAt = now, + ExpiresAt = expires, + ConcurrencyStamp = Guid.NewGuid().ToString("N"), + } + ); + try + { + await db.SaveChangesAsync(ct); + LogAcquired(logger, name, ownerId); + return true; + } + catch (DbUpdateException ex) + { + LogContended(logger, name, ex.Message); + return false; + } + } + + if (existing.ExpiresAt > now && !string.Equals(existing.OwnerWorkerId, ownerId, StringComparison.Ordinal)) + { + return false; + } + + existing.OwnerWorkerId = ownerId; + existing.AcquiredAt = now; + existing.ExpiresAt = expires; + existing.ConcurrencyStamp = Guid.NewGuid().ToString("N"); + + try + { + await db.SaveChangesAsync(ct); + LogAcquired(logger, name, ownerId); + return true; + } + catch (DbUpdateConcurrencyException) + { + LogContended(logger, name, "concurrency"); + return false; + } + } + + public async Task ReleaseAsync(string name, CancellationToken ct = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + var row = await db.JobMutexes.FirstOrDefaultAsync(m => m.Name == name, ct); + if (row is null) + return; + db.JobMutexes.Remove(row); + try + { + await db.SaveChangesAsync(ct); + LogReleased(logger, name); + } + catch (DbUpdateConcurrencyException) + { + // Another worker beat us to release; that's fine — mutex is gone either way. + } + } + + [LoggerMessage(Level = LogLevel.Debug, Message = "Mutex '{Name}' acquired by {Owner}")] + private static partial void LogAcquired(ILogger logger, string name, string owner); + + [LoggerMessage(Level = LogLevel.Debug, Message = "Mutex '{Name}' contended: {Reason}")] + private static partial void LogContended(ILogger logger, string name, string reason); + + [LoggerMessage(Level = LogLevel.Debug, Message = "Mutex '{Name}' released")] + private static partial void LogReleased(ILogger logger, string name); +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/IInstanceLeader.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/IInstanceLeader.cs new file mode 100644 index 00000000..53cf7bbe --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/IInstanceLeader.cs @@ -0,0 +1,15 @@ +namespace SimpleModule.BackgroundJobs.Scheduler; + +internal interface IInstanceLeader +{ + /// + /// Try to acquire (or renew) a named lease for . + /// Returns true when the caller now holds the lease. + /// + Task TryAcquireAsync( + string name, + string ownerId, + TimeSpan ttl, + CancellationToken ct = default + ); +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/IJobMutex.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/IJobMutex.cs new file mode 100644 index 00000000..a134e3de --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/IJobMutex.cs @@ -0,0 +1,13 @@ +namespace SimpleModule.BackgroundJobs.Scheduler; + +internal interface IJobMutex +{ + Task TryAcquireAsync( + string name, + string ownerId, + TimeSpan ttl, + CancellationToken ct = default + ); + + Task ReleaseAsync(string name, CancellationToken ct = default); +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerOptions.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerOptions.cs new file mode 100644 index 00000000..eb735609 --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerOptions.cs @@ -0,0 +1,17 @@ +namespace SimpleModule.BackgroundJobs.Scheduler; + +public sealed class SchedulerOptions +{ + public const string LeaseName = "scheduler"; + public const string ScheduledJobSentinel = "schedule:"; + public const string MutexPrefix = "mutex:"; + + /// How often the scheduler tick runs. + public TimeSpan TickInterval { get; set; } = TimeSpan.FromSeconds(30); + + /// Lease TTL for OnOneServer election; should comfortably exceed TickInterval. + public TimeSpan LeaseTtl { get; set; } = TimeSpan.FromMinutes(1); + + /// Default mutex TTL applied when WithoutOverlapping is set. + public TimeSpan MutexTtl { get; set; } = TimeSpan.FromHours(1); +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerReconciler.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerReconciler.cs new file mode 100644 index 00000000..3f2bd06c --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerReconciler.cs @@ -0,0 +1,132 @@ +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using SimpleModule.BackgroundJobs.Contracts; + +namespace SimpleModule.BackgroundJobs.Scheduler; + +internal static partial class SchedulerReconciler +{ + /// + /// Sync the in-memory into ScheduledJobStates. + /// Inserts missing rows, refreshes mutable fields on existing rows, but leaves + /// LastRunAt/NextRunAt alone (the tick path manages those). + /// + public static async Task ReconcileAsync( + BackgroundJobsDbContext db, + IReadOnlyList definitions, + DateTimeOffset now, + ILogger logger, + CancellationToken ct + ) + { + if (definitions.Count == 0) + return; + + var names = definitions.Select(d => d.Name).ToHashSet(StringComparer.Ordinal); + var existing = await db + .ScheduledJobStates.Where(s => names.Contains(s.Name)) + .ToDictionaryAsync(s => s.Name, StringComparer.Ordinal, ct); + + foreach (var def in definitions) + { + try + { + ReconcileOne(db, def, existing, now); + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + LogReconcileError(logger, def.Name, ex); + } + } + + await db.SaveChangesAsync(ct); + } + + private static void ReconcileOne( + BackgroundJobsDbContext db, + ScheduledJobDefinition def, + Dictionary existing, + DateTimeOffset now + ) + { + var payload = def.Payload is null + ? null + : JsonSerializer.Serialize(def.Payload, def.Payload.GetType()); + + if (!existing.TryGetValue(def.Name, out var row)) + { + var nextRun = TryGetNextOccurrence(def.CronExpression, def.TimeZoneId, now); + db.ScheduledJobStates.Add( + new ScheduledJobState + { + Name = def.Name, + JobTypeName = def.JobType.AssemblyQualifiedName!, + CronExpression = def.CronExpression, + TimeZoneId = def.TimeZoneId, + Payload = payload, + WithoutOverlapping = def.WithoutOverlapping, + OnOneServer = def.OnOneServer, + IsEnabled = true, + NextRunAt = nextRun, + CreatedAt = now, + UpdatedAt = now, + ConcurrencyStamp = Guid.NewGuid().ToString("N"), + } + ); + return; + } + + var jobType = def.JobType.AssemblyQualifiedName!; + var changed = + !string.Equals(row.JobTypeName, jobType, StringComparison.Ordinal) + || !string.Equals(row.CronExpression, def.CronExpression, StringComparison.Ordinal) + || !string.Equals(row.TimeZoneId, def.TimeZoneId, StringComparison.Ordinal) + || !string.Equals(row.Payload, payload, StringComparison.Ordinal) + || row.WithoutOverlapping != def.WithoutOverlapping + || row.OnOneServer != def.OnOneServer + || row.NextRunAt is null; + + if (!changed) + { + // Nothing to do; avoid a no-op UPDATE on every tick. + return; + } + + var cronOrTzChanged = + !string.Equals(row.CronExpression, def.CronExpression, StringComparison.Ordinal) + || !string.Equals(row.TimeZoneId, def.TimeZoneId, StringComparison.Ordinal); + + row.JobTypeName = jobType; + row.CronExpression = def.CronExpression; + row.TimeZoneId = def.TimeZoneId; + row.Payload = payload; + row.WithoutOverlapping = def.WithoutOverlapping; + row.OnOneServer = def.OnOneServer; + row.UpdatedAt = now; + + if (cronOrTzChanged || row.NextRunAt is null) + { + row.NextRunAt = TryGetNextOccurrence(def.CronExpression, def.TimeZoneId, now); + } + } + + private static DateTimeOffset? TryGetNextOccurrence( + string expression, + string timeZoneId, + DateTimeOffset now + ) + { + if (string.IsNullOrWhiteSpace(expression)) + return null; + return CronCalculator.GetNextOccurrence(expression, timeZoneId, now); + } + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Scheduler reconcile failed for definition '{Name}'" + )] + private static partial void LogReconcileError(ILogger logger, string name, Exception ex); +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerService.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerService.cs new file mode 100644 index 00000000..05d12cdc --- /dev/null +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Scheduler/SchedulerService.cs @@ -0,0 +1,231 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using SimpleModule.BackgroundJobs.Contracts; +using SimpleModule.BackgroundJobs.Worker; + +namespace SimpleModule.BackgroundJobs.Scheduler; + +/// +/// Hosted service that turns code-declared scheduled jobs into enqueued +/// s. Ticks every : +/// reconciles in-memory definitions to the database, optionally acquires the +/// OnOneServer lease, then enqueues each due definition (respecting +/// the per-job WithoutOverlapping mutex). +/// +internal sealed partial class SchedulerService( + IServiceScopeFactory scopeFactory, + IScheduler registry, + WorkerIdentity identity, + IOptions options, + TimeProvider clock, + ILogger logger +) : BackgroundService +{ + private readonly SchedulerOptions _options = options.Value; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + LogStarted(logger, identity.Id, _options.TickInterval); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await TickOnceAsync(stoppingToken); + } + catch (OperationCanceledException) + { + break; + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + LogTickError(logger, ex); + } + + try + { + await Task.Delay(_options.TickInterval, clock, stoppingToken); + } + catch (OperationCanceledException) + { + break; + } + } + } + + internal async Task TickOnceAsync(CancellationToken ct) + { + var definitions = registry.Definitions; + if (definitions.Count == 0) + return; + + await using var scope = scopeFactory.CreateAsyncScope(); + var sp = scope.ServiceProvider; + var now = clock.GetUtcNow(); + + var db = sp.GetRequiredService(); + await SchedulerReconciler.ReconcileAsync(db, definitions, now, logger, ct); + + // Only acquire a lease if at least one registered definition asked for it. + if (definitions.Any(d => d.OnOneServer)) + { + var leader = sp.GetRequiredService(); + var heldByMe = await leader.TryAcquireAsync( + SchedulerOptions.LeaseName, + identity.Id, + _options.LeaseTtl, + ct + ); + if (!heldByMe) + { + LogLeaseLost(logger, identity.Id); + return; + } + } + + var dueNames = await db + .ScheduledJobStates.Where(s => + s.IsEnabled && s.NextRunAt != null && s.NextRunAt <= now + ) + .Select(s => s.Name) + .ToListAsync(ct); + + if (dueNames.Count == 0) + return; + + var byName = definitions.ToDictionary(d => d.Name, StringComparer.Ordinal); + + foreach (var name in dueNames) + { + if (!byName.TryGetValue(name, out var def)) + { + // Definition was removed in code but row remains — skip silently. + continue; + } + + try + { + await ProcessOneAsync(name, def, now, ct); + } +#pragma warning disable CA1031 + catch (Exception ex) +#pragma warning restore CA1031 + { + LogDefinitionError(logger, name, ex); + } + } + } + + // Fresh scope per due definition so mutex/queue saves never flush other + // tracked state, and so a failure in one definition can't leave the + // outer DbContext with partially-modified rows. + private async Task ProcessOneAsync( + string name, + ScheduledJobDefinition def, + DateTimeOffset now, + CancellationToken ct + ) + { + await using var scope = scopeFactory.CreateAsyncScope(); + var sp = scope.ServiceProvider; + var db = sp.GetRequiredService(); + var state = await db.ScheduledJobStates.FirstOrDefaultAsync(s => s.Name == name, ct); + if (state is null) + return; + + if (state.WithoutOverlapping) + { + var mutex = sp.GetRequiredService(); + var acquired = await mutex.TryAcquireAsync( + MutexNameFor(state.Name), + identity.Id, + _options.MutexTtl, + ct + ); + if (!acquired) + { + // Skip this tick but advance NextRunAt so we don't busy-loop on a stuck mutex. + LogMutexSkip(logger, state.Name); + state.NextRunAt = CronCalculator.GetNextOccurrence( + state.CronExpression, + state.TimeZoneId, + now + ); + state.UpdatedAt = now; + await db.SaveChangesAsync(ct); + return; + } + } + + var jobId = Guid.NewGuid(); + var queue = sp.GetRequiredService(); + await queue.EnqueueAsync( + new JobQueueEntry( + jobId, + state.JobTypeName, + state.Payload, + now, + JobQueueEntryState.Pending, + 0, + state.CronExpression, + SchedulerOptions.ScheduledJobSentinel + state.Name, + now + ), + ct + ); + + state.LastRunAt = now; + state.NextRunAt = CronCalculator.GetNextOccurrence( + state.CronExpression, + state.TimeZoneId, + now + ); + state.UpdatedAt = now; + await db.SaveChangesAsync(ct); + + LogEnqueued(logger, state.Name, jobId, state.NextRunAt); + } + + internal static string MutexNameFor(string scheduledJobName) => + SchedulerOptions.MutexPrefix + scheduledJobName; + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Scheduler started on {WorkerId} (tick={Tick})" + )] + private static partial void LogStarted(ILogger logger, string workerId, TimeSpan tick); + + [LoggerMessage(Level = LogLevel.Debug, Message = "Scheduler tick on {WorkerId} did not hold lease — skipping")] + private static partial void LogLeaseLost(ILogger logger, string workerId); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Enqueued scheduled job '{Name}' as {JobId}; next run {NextRunAt}" + )] + private static partial void LogEnqueued( + ILogger logger, + string name, + Guid jobId, + DateTimeOffset? nextRunAt + ); + + [LoggerMessage( + Level = LogLevel.Information, + Message = "Scheduled job '{Name}' skipped — mutex held" + )] + private static partial void LogMutexSkip(ILogger logger, string name); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Scheduler error processing definition '{Name}'" + )] + private static partial void LogDefinitionError(ILogger logger, string name, Exception ex); + + [LoggerMessage(Level = LogLevel.Error, Message = "Scheduler tick error")] + private static partial void LogTickError(ILogger logger, Exception ex); +} diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Services/BackgroundJobsService.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Services/BackgroundJobsService.cs index 658cf1e7..8c2bc35a 100644 --- a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Services/BackgroundJobsService.cs +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Services/BackgroundJobsService.cs @@ -1,9 +1,9 @@ // modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Services/BackgroundJobsService.cs using System.Text.Json; -using Cronos; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using SimpleModule.BackgroundJobs.Contracts; +using SimpleModule.BackgroundJobs.Scheduler; using static SimpleModule.BackgroundJobs.BackgroundJobsInternalConstants; namespace SimpleModule.BackgroundJobs.Services; @@ -81,12 +81,8 @@ CancellationToken ct ) where TJob : IModuleJob { - // Validate cron expression - var format = - cronExpression.Split(' ').Length > 5 ? CronFormat.IncludeSeconds : CronFormat.Standard; - var cron = CronExpression.Parse(cronExpression, format); var next = - cron.GetNextOccurrence(DateTime.UtcNow, inclusive: false) + CronCalculator.GetNextOccurrenceUtc(cronExpression, DateTime.UtcNow) ?? throw new InvalidOperationException( $"Cron '{cronExpression}' has no next occurrence." ); @@ -110,7 +106,7 @@ await queue.EnqueueAsync( id, jobType.AssemblyQualifiedName!, serialized, - new DateTimeOffset(next, TimeSpan.Zero), + next, JobQueueEntryState.Pending, 0, cronExpression, @@ -152,15 +148,9 @@ await db.JobQueueEntries.FirstOrDefaultAsync(e => e.Id == jobId, ct) if (isDisabled && row.CronExpression is not null) { - var format = - row.CronExpression.Split(' ').Length > 5 - ? CronFormat.IncludeSeconds - : CronFormat.Standard; - var cron = CronExpression.Parse(row.CronExpression, format); - var next = cron.GetNextOccurrence(DateTime.UtcNow, inclusive: false); - row.ScheduledAt = next.HasValue - ? new DateTimeOffset(next.Value, TimeSpan.Zero) - : DateTimeOffset.UtcNow; + row.ScheduledAt = + CronCalculator.GetNextOccurrenceUtc(row.CronExpression, DateTime.UtcNow) + ?? DateTimeOffset.UtcNow; } else { diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Worker/JobProcessorService.cs b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Worker/JobProcessorService.cs index 9452bf72..14b772c7 100644 --- a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Worker/JobProcessorService.cs +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Worker/JobProcessorService.cs @@ -1,10 +1,10 @@ // modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/Worker/JobProcessorService.cs -using Cronos; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using SimpleModule.BackgroundJobs.Contracts; +using SimpleModule.BackgroundJobs.Scheduler; using SimpleModule.BackgroundJobs.Services; namespace SimpleModule.BackgroundJobs.Worker; @@ -85,7 +85,11 @@ private async Task ExecuteEntryAsync(JobQueueEntry entry, CancellationToken ct) await queue.CompleteAsync(entry.Id, ct); LogCompleted(logger, entry.Id, jobType.Name); - if (entry.CronExpression is not null && entry.RecurringName is not null) + if (IsScheduledJobEntry(entry)) + { + await ReleaseSchedulerMutexAsync(scope.ServiceProvider, entry, ct); + } + else if (entry.CronExpression is not null && entry.RecurringName is not null) { await ScheduleNextRecurringAsync(queue, entry, ct); } @@ -99,6 +103,10 @@ private async Task ExecuteEntryAsync(JobQueueEntry entry, CancellationToken ct) #pragma warning restore CA1031 { LogJobError(logger, entry.Id, jobType.Name, ex); + if (IsScheduledJobEntry(entry)) + { + await ReleaseSchedulerMutexAsync(scope.ServiceProvider, entry, ct); + } if (entry.AttemptCount < _options.MaxAttempts) { var delay = TimeSpan.FromSeconds(_options.RetryBaseDelay.TotalSeconds * Math.Pow(2, entry.AttemptCount - 1)); @@ -116,16 +124,29 @@ private async Task ExecuteEntryAsync(JobQueueEntry entry, CancellationToken ct) } } + private static bool IsScheduledJobEntry(JobQueueEntry entry) => + entry.RecurringName is not null + && entry.RecurringName.StartsWith(SchedulerOptions.ScheduledJobSentinel, StringComparison.Ordinal); + + private static async Task ReleaseSchedulerMutexAsync( + IServiceProvider sp, + JobQueueEntry entry, + CancellationToken ct + ) + { + var mutex = sp.GetRequiredService(); + var scheduledName = entry.RecurringName![SchedulerOptions.ScheduledJobSentinel.Length..]; + await mutex.ReleaseAsync(SchedulerService.MutexNameFor(scheduledName), ct); + } + private static async Task ScheduleNextRecurringAsync(IJobQueue queue, JobQueueEntry entry, CancellationToken ct) { - var format = entry.CronExpression!.Split(' ').Length > 5 ? CronFormat.IncludeSeconds : CronFormat.Standard; - var cron = CronExpression.Parse(entry.CronExpression, format); - var next = cron.GetNextOccurrence(DateTime.UtcNow, inclusive: false); - if (!next.HasValue) return; + var next = CronCalculator.GetNextOccurrenceUtc(entry.CronExpression!, DateTime.UtcNow); + if (next is null) return; await queue.EnqueueAsync(new JobQueueEntry( Guid.NewGuid(), entry.JobTypeName, entry.SerializedData, - new DateTimeOffset(next.Value, TimeSpan.Zero), + next.Value, JobQueueEntryState.Pending, 0, entry.CronExpression, entry.RecurringName, DateTimeOffset.UtcNow), ct); } diff --git a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/types.ts b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/types.ts index c232f9ca..c473cd06 100644 --- a/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/types.ts +++ b/modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/types.ts @@ -1,6 +1,14 @@ // Auto-generated from [Dto] types — do not edit -export interface ModuleJobRegistration { - jobType: any; +export interface ScheduledJobDto { + name: string; + jobType: string; + cronExpression: string; + timeZoneId: string; + withoutOverlapping: boolean; + onOneServer: boolean; + isEnabled: boolean; + lastRunAt: string | null; + nextRunAt: string | null; } export interface JobDetailDto { diff --git a/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/DatabaseInstanceLeaderTests.cs b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/DatabaseInstanceLeaderTests.cs new file mode 100644 index 00000000..016e902d --- /dev/null +++ b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/DatabaseInstanceLeaderTests.cs @@ -0,0 +1,58 @@ +using BackgroundJobs.Tests.Helpers; +using FluentAssertions; +using Microsoft.Extensions.Logging.Abstractions; +using SimpleModule.BackgroundJobs.Scheduler; + +namespace SimpleModule.BackgroundJobs.Tests.Scheduler; + +public sealed class DatabaseInstanceLeaderTests : IDisposable +{ + private readonly TestDbContextFactory _factory = new(); + + [Fact] + public async Task FirstAcquirer_Wins() + { + using var db = _factory.Create(); + var leader = new DatabaseInstanceLeader(db, NullLogger.Instance); + (await leader.TryAcquireAsync("scheduler", "host-A", TimeSpan.FromMinutes(1))) + .Should() + .BeTrue(); + } + + [Fact] + public async Task SecondAcquirer_Fails_WhileLeaseHeld() + { + using var db1 = _factory.Create(); + using var db2 = _factory.Create(); + var l1 = new DatabaseInstanceLeader(db1, NullLogger.Instance); + var l2 = new DatabaseInstanceLeader(db2, NullLogger.Instance); + + (await l1.TryAcquireAsync("scheduler", "host-A", TimeSpan.FromMinutes(1))).Should().BeTrue(); + (await l2.TryAcquireAsync("scheduler", "host-B", TimeSpan.FromMinutes(1))).Should().BeFalse(); + } + + [Fact] + public async Task SameOwner_RenewsLease() + { + using var db = _factory.Create(); + var leader = new DatabaseInstanceLeader(db, NullLogger.Instance); + (await leader.TryAcquireAsync("scheduler", "host-A", TimeSpan.FromMinutes(1))).Should().BeTrue(); + (await leader.TryAcquireAsync("scheduler", "host-A", TimeSpan.FromMinutes(1))).Should().BeTrue(); + } + + [Fact] + public async Task NewOwner_TakesOver_AfterExpiry() + { + using var db = _factory.Create(); + var leader = new DatabaseInstanceLeader(db, NullLogger.Instance); + (await leader.TryAcquireAsync("scheduler", "host-A", TimeSpan.FromMilliseconds(1))).Should().BeTrue(); + await Task.Delay(50); + (await leader.TryAcquireAsync("scheduler", "host-B", TimeSpan.FromMinutes(1))).Should().BeTrue(); + } + + public void Dispose() + { + _factory.Dispose(); + GC.SuppressFinalize(this); + } +} diff --git a/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/DatabaseJobMutexTests.cs b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/DatabaseJobMutexTests.cs new file mode 100644 index 00000000..65538d2a --- /dev/null +++ b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/DatabaseJobMutexTests.cs @@ -0,0 +1,75 @@ +using BackgroundJobs.Tests.Helpers; +using FluentAssertions; +using Microsoft.Extensions.Logging.Abstractions; +using SimpleModule.BackgroundJobs.Scheduler; + +namespace SimpleModule.BackgroundJobs.Tests.Scheduler; + +public sealed class DatabaseJobMutexTests : IDisposable +{ + private readonly TestDbContextFactory _factory = new(); + + [Fact] + public async Task TryAcquire_Succeeds_OnFirstCall() + { + using var db = _factory.Create(); + var mutex = new DatabaseJobMutex(db, NullLogger.Instance); + + var ok = await mutex.TryAcquireAsync("job", "owner-1", TimeSpan.FromMinutes(1)); + ok.Should().BeTrue(); + } + + [Fact] + public async Task TryAcquire_Fails_WhenHeldByAnotherOwner() + { + using var db1 = _factory.Create(); + using var db2 = _factory.Create(); + + var m1 = new DatabaseJobMutex(db1, NullLogger.Instance); + var m2 = new DatabaseJobMutex(db2, NullLogger.Instance); + + (await m1.TryAcquireAsync("job", "owner-1", TimeSpan.FromMinutes(1))).Should().BeTrue(); + (await m2.TryAcquireAsync("job", "owner-2", TimeSpan.FromMinutes(1))).Should().BeFalse(); + } + + [Fact] + public async Task TryAcquire_SameOwner_RefreshesLease() + { + using var db = _factory.Create(); + var mutex = new DatabaseJobMutex(db, NullLogger.Instance); + + (await mutex.TryAcquireAsync("job", "owner-1", TimeSpan.FromMinutes(1))).Should().BeTrue(); + (await mutex.TryAcquireAsync("job", "owner-1", TimeSpan.FromMinutes(1))).Should().BeTrue(); + } + + [Fact] + public async Task TryAcquire_OtherOwner_CanTakeOverAfterExpiry() + { + using var db = _factory.Create(); + var mutex = new DatabaseJobMutex(db, NullLogger.Instance); + + (await mutex.TryAcquireAsync("job", "owner-1", TimeSpan.FromMilliseconds(1))).Should().BeTrue(); + await Task.Delay(50); + (await mutex.TryAcquireAsync("job", "owner-2", TimeSpan.FromMinutes(1))).Should().BeTrue(); + } + + [Fact] + public async Task Release_RemovesMutexRow() + { + using var db = _factory.Create(); + var mutex = new DatabaseJobMutex(db, NullLogger.Instance); + + await mutex.TryAcquireAsync("job", "owner-1", TimeSpan.FromMinutes(1)); + await mutex.ReleaseAsync("job"); + + using var db2 = _factory.Create(); + var m2 = new DatabaseJobMutex(db2, NullLogger.Instance); + (await m2.TryAcquireAsync("job", "owner-2", TimeSpan.FromMinutes(1))).Should().BeTrue(); + } + + public void Dispose() + { + _factory.Dispose(); + GC.SuppressFinalize(this); + } +} diff --git a/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/ScheduledJobBuilderTests.cs b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/ScheduledJobBuilderTests.cs new file mode 100644 index 00000000..c4c45210 --- /dev/null +++ b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/ScheduledJobBuilderTests.cs @@ -0,0 +1,125 @@ +using FluentAssertions; +using SimpleModule.BackgroundJobs.Contracts; + +namespace SimpleModule.BackgroundJobs.Tests.Scheduler; + +public sealed class ScheduledJobBuilderTests +{ + [Fact] + public void Cron_StoresExpression() + { + var registry = new SchedulerRegistry(); + registry.Job("a").Cron("0 8 * * MON-FRI"); + + var def = registry.Definitions.Single(); + def.CronExpression.Should().Be("0 8 * * MON-FRI"); + } + + [Theory] + [InlineData(1, "*/1 * * * *")] + [InlineData(5, "*/5 * * * *")] + [InlineData(30, "*/30 * * * *")] + public void EveryMinutes_RendersCron(int minutes, string expected) + { + var registry = new SchedulerRegistry(); + registry.Job("a").EveryMinutes(minutes); + registry.Definitions.Single().CronExpression.Should().Be(expected); + } + + [Theory] + [InlineData(0)] + [InlineData(60)] + public void EveryMinutes_RejectsOutOfRange(int minutes) + { + var registry = new SchedulerRegistry(); + var act = () => registry.Job("a").EveryMinutes(minutes); + act.Should().Throw(); + } + + [Fact] + public void Hourly_Daily_Weekdays_RenderExpectedCron() + { + var registry = new SchedulerRegistry(); + registry.Job("h").Hourly(); + registry.Job("d").Daily(); + registry.Job("w").Weekdays(); + + registry.Definitions.Single(d => d.Name == "h").CronExpression.Should().Be("0 * * * *"); + registry.Definitions.Single(d => d.Name == "d").CronExpression.Should().Be("0 0 * * *"); + registry + .Definitions.Single(d => d.Name == "w") + .CronExpression.Should() + .Be("0 0 * * MON-FRI"); + } + + [Theory] + [InlineData("02:00", "0 2 * * *")] + [InlineData("23:45", "45 23 * * *")] + public void DailyAt_RendersCron(string time, string expected) + { + var registry = new SchedulerRegistry(); + registry.Job("a").DailyAt(time); + registry.Definitions.Single().CronExpression.Should().Be(expected); + } + + [Theory] + [InlineData("garbage")] + [InlineData("25:00")] + [InlineData("2:00")] + public void DailyAt_RejectsBadTime(string time) + { + var registry = new SchedulerRegistry(); + var act = () => registry.Job("a").DailyAt(time); + act.Should().Throw(); + } + + [Fact] + public void Timezone_AcceptsKnownZone() + { + var registry = new SchedulerRegistry(); + registry.Job("a").Timezone("UTC"); + registry.Definitions.Single().TimeZoneId.Should().Be("UTC"); + } + + [Fact] + public void Timezone_RejectsUnknownZone() + { + var registry = new SchedulerRegistry(); + var act = () => registry.Job("a").Timezone("Not/A/Zone"); + act.Should().Throw(); + } + + [Fact] + public void Flags_AreCarriedOnDefinition() + { + var registry = new SchedulerRegistry(); + registry + .Job("a") + .Hourly() + .WithoutOverlapping() + .OnOneServer() + .WithPayload(new { x = 1 }); + + var def = registry.Definitions.Single(); + def.WithoutOverlapping.Should().BeTrue(); + def.OnOneServer.Should().BeTrue(); + def.Payload.Should().NotBeNull(); + } + + [Fact] + public void Job_DuplicateNameThrows() + { + var registry = new SchedulerRegistry(); + registry.Job("dup").Daily(); + var act = () => registry.Job("dup").Daily(); + act.Should().Throw(); + } + + [Fact] + public void Job_DefaultName_UsesJobTypeFullName() + { + var registry = new SchedulerRegistry(); + registry.Job().Daily(); + registry.Definitions.Single().Name.Should().Be(typeof(FakeJobA).FullName); + } +} diff --git a/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/SchedulerServiceTickTests.cs b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/SchedulerServiceTickTests.cs new file mode 100644 index 00000000..3bac167c --- /dev/null +++ b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/SchedulerServiceTickTests.cs @@ -0,0 +1,188 @@ +using BackgroundJobs.Tests.Helpers; +using FluentAssertions; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Microsoft.Extensions.Time.Testing; +using SimpleModule.BackgroundJobs.Contracts; +using SimpleModule.BackgroundJobs.Queue; +using SimpleModule.BackgroundJobs.Scheduler; +using SimpleModule.BackgroundJobs.Worker; + +namespace SimpleModule.BackgroundJobs.Tests.Scheduler; + +public sealed class SchedulerServiceTickTests : IDisposable +{ + private readonly TestDbContextFactory _factory = new(); + + [Fact] + public async Task Tick_EnqueuesDueJob() + { + await using var harness = CreateHarness(out var registry, "host-A"); + registry.Job("a").EveryMinutes(1); + + // First tick at T=0 — reconcile + maybe enqueue, depending on cron. + await harness.Service.TickOnceAsync(CancellationToken.None); + + // Advance the clock 90 seconds so NextRunAt is definitely in the past. + harness.Clock.Advance(TimeSpan.FromSeconds(90)); + await harness.Service.TickOnceAsync(CancellationToken.None); + + await using var verify = _factory.Create(); + var entries = await verify + .JobQueueEntries.Where(e => + e.RecurringName == SchedulerOptions.ScheduledJobSentinel + "a" + ) + .ToListAsync(); + entries.Should().NotBeEmpty(); + entries[0].JobTypeName.Should().Be(typeof(FakeJobA).AssemblyQualifiedName); + } + + [Fact] + public async Task Tick_SkipsWhenNextRunInFuture() + { + await using var harness = CreateHarness(out var registry, "host-A"); + registry.Job("a").Daily(); // next run hours away + + await harness.Service.TickOnceAsync(CancellationToken.None); + // Don't advance the clock — re-tick immediately. + await harness.Service.TickOnceAsync(CancellationToken.None); + + await using var verify = _factory.Create(); + var entries = await verify + .JobQueueEntries.Where(e => + e.RecurringName == SchedulerOptions.ScheduledJobSentinel + "a" + ) + .ToListAsync(); + entries.Should().BeEmpty(); + } + + [Fact] + public async Task Tick_ContinuesPastBadCronDefinition() + { + await using var harness = CreateHarness(out var registry, "host-A"); + registry.Job("bad").Cron("this is not a cron"); + registry.Job("good").EveryMinutes(1); + + await harness.Service.TickOnceAsync(CancellationToken.None); + harness.Clock.Advance(TimeSpan.FromSeconds(90)); + await harness.Service.TickOnceAsync(CancellationToken.None); + + await using var verify = _factory.Create(); + var goodEntries = await verify + .JobQueueEntries.Where(e => + e.RecurringName == SchedulerOptions.ScheduledJobSentinel + "good" + ) + .ToListAsync(); + goodEntries.Should().NotBeEmpty(); + } + + [Fact] + public async Task Tick_OnOneServer_OnlyOneHostEnqueues() + { + await using var harnessA = CreateHarness(out var registryA, "host-A"); + await using var harnessB = CreateHarness(out var registryB, "host-B"); + + registryA.Job("shared").EveryMinutes(1).OnOneServer(); + registryB.Job("shared").EveryMinutes(1).OnOneServer(); + + await harnessA.Service.TickOnceAsync(CancellationToken.None); + await harnessB.Service.TickOnceAsync(CancellationToken.None); + harnessA.Clock.Advance(TimeSpan.FromSeconds(90)); + harnessB.Clock.Advance(TimeSpan.FromSeconds(90)); + + await harnessA.Service.TickOnceAsync(CancellationToken.None); + await harnessB.Service.TickOnceAsync(CancellationToken.None); + + await using var verify = _factory.Create(); + var entries = await verify + .JobQueueEntries.Where(e => + e.RecurringName == SchedulerOptions.ScheduledJobSentinel + "shared" + ) + .ToListAsync(); + entries.Should().HaveCount(1); + } + + [Fact] + public async Task Tick_WithoutOverlapping_SkipsWhenMutexHeld() + { + await using var harness = CreateHarness(out var registry, "host-A"); + registry.Job("locked").EveryMinutes(1).WithoutOverlapping(); + + // Pre-acquire the mutex outside the scheduler so the tick can't take it. + await using (var pre = _factory.Create()) + { + var mutex = new DatabaseJobMutex(pre, NullLogger.Instance); + ( + await mutex.TryAcquireAsync( + SchedulerService.MutexNameFor("locked"), + "external-owner", + TimeSpan.FromMinutes(5) + ) + ).Should().BeTrue(); + } + + await harness.Service.TickOnceAsync(CancellationToken.None); + harness.Clock.Advance(TimeSpan.FromSeconds(90)); + await harness.Service.TickOnceAsync(CancellationToken.None); + + await using var verify = _factory.Create(); + var entries = await verify + .JobQueueEntries.Where(e => + e.RecurringName == SchedulerOptions.ScheduledJobSentinel + "locked" + ) + .ToListAsync(); + entries.Should().BeEmpty(); + } + + private SchedulerHarness CreateHarness(out SchedulerRegistry registry, string ownerId) + { + var clock = new FakeTimeProvider( + new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero) + ); + var reg = new SchedulerRegistry(); + registry = reg; + + var sp = new ServiceCollection() + .AddScoped(_ => _factory.Create()) + .AddScoped() + .AddScoped() + .AddScoped() + .AddLogging() + .BuildServiceProvider(); + + var service = new SchedulerService( + sp.GetRequiredService(), + reg, + new WorkerIdentity(ownerId), + Options.Create(new SchedulerOptions { TickInterval = TimeSpan.FromSeconds(30) }), + clock, + NullLogger.Instance + ); + + return new SchedulerHarness(sp, service, clock); + } + + public void Dispose() + { + _factory.Dispose(); + GC.SuppressFinalize(this); + } + + private sealed class SchedulerHarness( + ServiceProvider sp, + SchedulerService service, + FakeTimeProvider clock + ) : IAsyncDisposable + { + public SchedulerService Service { get; } = service; + public FakeTimeProvider Clock { get; } = clock; + + public async ValueTask DisposeAsync() + { + Service.Dispose(); + await sp.DisposeAsync(); + } + } +} diff --git a/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/TestJobs.cs b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/TestJobs.cs new file mode 100644 index 00000000..cc982231 --- /dev/null +++ b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/TestJobs.cs @@ -0,0 +1,21 @@ +using SimpleModule.BackgroundJobs.Contracts; + +namespace SimpleModule.BackgroundJobs.Tests.Scheduler; + +public sealed class FakeJobA : IModuleJob +{ + public Task ExecuteAsync(IJobExecutionContext context, CancellationToken cancellationToken) => + Task.CompletedTask; +} + +public sealed class FakeJobB : IModuleJob +{ + public Task ExecuteAsync(IJobExecutionContext context, CancellationToken cancellationToken) => + Task.CompletedTask; +} + +public sealed class FakeJobC : IModuleJob +{ + public Task ExecuteAsync(IJobExecutionContext context, CancellationToken cancellationToken) => + Task.CompletedTask; +} diff --git a/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/SimpleModule.BackgroundJobs.Tests.csproj b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/SimpleModule.BackgroundJobs.Tests.csproj index 04aaaa92..9ee423a6 100644 --- a/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/SimpleModule.BackgroundJobs.Tests.csproj +++ b/modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/SimpleModule.BackgroundJobs.Tests.csproj @@ -14,6 +14,7 @@ + diff --git a/tasks/task-scheduler-plan.md b/tasks/task-scheduler-plan.md new file mode 100644 index 00000000..12244b53 --- /dev/null +++ b/tasks/task-scheduler-plan.md @@ -0,0 +1,184 @@ +# Task Scheduler API (Issue #159) + +Branch: `task-scheduler` + +## Goal + +Layer a Laravel-style fluent task-scheduling API on top of the existing `BackgroundJobs` infrastructure so that modules can declare recurring jobs at startup time. The schema already has `ScheduledAt`/`CronExpression`; this work adds: + +1. A fluent registration surface (`IScheduler` / `IScheduledJob`). +2. A reconciler/poller hosted service that turns declared schedules into queued jobs. +3. Mutex (`WithoutOverlapping`) and leader (`OnOneServer`) primitives. +4. `sm jobs list-scheduled` CLI command. + +The existing **runtime-added** recurring-jobs path (`IBackgroundJobs.AddRecurringAsync`) is untouched. The new `IScheduler` API is the **declarative**, code-defined path. + +## Architecture + +``` +ConfigureServices time: Runtime (Consumer host): + + scheduler.Job() ┌────────────────────┐ + .DailyAt("02:00") ─────────► │ SchedulerRegistry │ (singleton) + .Timezone("UTC"); │ List │ + └─────────┬──────────┘ + │ + ▼ + ┌────────────────────┐ + │ SchedulerService │ tick every 30s + │ (IHostedService) │ + └─────────┬──────────┘ + │ + try-acquire LeaseAsync("scheduler") if OnOneServer + │ + ▼ + for each def: compute NextRunAt + if NextRunAt <= now: + if WithoutOverlapping: + mutex acquire by JobName + if held → skip + IJobQueue.EnqueueAsync(JobQueueEntry) + ScheduledJobState.LastRunAt = now + ScheduledJobState.NextRunAt = cron.GetNext(now) +``` + +## Phase 1 — Contracts + +`modules/BackgroundJobs/src/SimpleModule.BackgroundJobs.Contracts/` + +### New files + +- `IScheduler.cs` + + ```csharp + public interface IScheduler + { + IScheduledJob Job(string? name = null) where TJob : IModuleJob; + IReadOnlyList Definitions { get; } + } + ``` + +- `IScheduledJob.cs` + + ```csharp + public interface IScheduledJob where TJob : IModuleJob + { + IScheduledJob Cron(string expression); + IScheduledJob EveryMinutes(int minutes); + IScheduledJob Hourly(); + IScheduledJob Daily(); + IScheduledJob DailyAt(string time); // "HH:mm" + IScheduledJob Weekdays(); // Mon-Fri + IScheduledJob Timezone(string tz); // IANA id (UTC default) + IScheduledJob WithoutOverlapping(); + IScheduledJob OnOneServer(); + IScheduledJob WithPayload(object payload); + } + ``` + +- `ScheduledJobDefinition.cs` — mutable in-memory record returned by `Job()`. Fields: `Name`, `JobType`, `CronExpression`, `TimeZoneId`, `Payload`, `WithoutOverlapping`, `OnOneServer`. + +- `ScheduledJobDto.cs` — flat DTO for CLI / future endpoints (`[Dto]`). Fields: `Name`, `JobType`, `CronExpression`, `TimeZoneId`, `WithoutOverlapping`, `OnOneServer`, `LastRunAt`, `NextRunAt`, `IsEnabled`. + +### Update `IBackgroundJobsContracts` — no change required (the scheduler is its own contract, parallel to existing recurring API). + +## Phase 2 — Implementation + +`modules/BackgroundJobs/src/SimpleModule.BackgroundJobs/` + +### Schedule registry & DSL + +- `Scheduler/SchedulerRegistry.cs` — singleton implementing `IScheduler`. Owns `List`. +- `Scheduler/ScheduledJobBuilder.cs` — generic fluent builder implementing `IScheduledJob`. Each method mutates the underlying `ScheduledJobDefinition`. `DailyAt("HH:mm")` → cron `m H * * *`. `EveryMinutes(n)` → `*/n * * * *`. `Hourly` → `0 * * * *`. `Daily` → `0 0 * * *`. `Weekdays` → `0 0 * * MON-FRI`. +- Timezone validation via `TimeZoneInfo.FindSystemTimeZoneById` (cross-platform on .NET 10 — Windows IDs accepted via ICU mapping). + +### DB entities + +`Contracts/` (new): +- `ScheduledJobState.cs` — `Name` (PK), `JobTypeName`, `CronExpression`, `TimeZoneId`, `LastRunAt`, `NextRunAt`, `IsEnabled`, `WithoutOverlapping`, `OnOneServer`, `Payload`, `CreatedAt`, `UpdatedAt`. +- `JobMutex.cs` — `Name` (PK), `OwnerWorkerId`, `AcquiredAt`, `ExpiresAt`, `ConcurrencyStamp`. +- `JobLease.cs` — `Name` (PK), `OwnerWorkerId`, `AcquiredAt`, `ExpiresAt`, `ConcurrencyStamp`. + +All three are `[NoDtoGeneration]` since they're internal state. + +`EntityConfigurations/` (new): +- `ScheduledJobStateConfiguration.cs`, `JobMutexConfiguration.cs`, `JobLeaseConfiguration.cs`. + +Update `BackgroundJobsDbContext` to add three `DbSet<>`s and apply the configurations. + +### Scheduler service + +- `Scheduler/SchedulerService.cs` — `BackgroundService`, polls `BackgroundJobsWorkerOptions.SchedulerTickInterval` (default 30s). + Tick: + 1. Reconcile registry → `ScheduledJobState` (insert missing, update cron/timezone/options on existing, leave `LastRunAt` alone). Definitions removed in code do not auto-delete DB rows (keep `IsEnabled=true`; explicit cleanup is a future concern). + 2. For each `ScheduledJobState` where any definition has `OnOneServer=true`: try-acquire single `scheduler` lease via `IInstanceLeader` (TTL = 2× tick interval). If lease not held, skip the tick. + 3. For each enabled state where `NextRunAt <= now`: + - If `WithoutOverlapping`: try `IMutex.TryAcquire(Name, ttl=1h)`. If already held → skip and re-compute next run. Mutex is **not** released by the scheduler — the worker releases it on completion/failure (hook into existing `JobProcessorService`). + - Compute next-run via `CronExpression.GetNextOccurrence` using `TimeZoneInfo` for the definition's timezone. + - Build `JobQueueEntry` with `RecurringName = "schedule:" + def.Name` (sentinel — distinguishes "scheduled" from existing "recurring"), serialise payload. + - `IJobQueue.EnqueueAsync`; update `LastRunAt = now`, `NextRunAt = nextOccurrence`. Bump `UpdatedAt`. + 4. Per-definition `try/catch` → log + continue; never let one bad cron stop the loop. + +- `Scheduler/IInstanceLeader.cs` + `DatabaseInstanceLeader.cs` — `Task TryAcquireAsync(string name, TimeSpan ttl, string ownerId, ct)`. SQL: `UPDATE JobLeases SET Owner=@me, Acquired=@now, Expires=@now+ttl WHERE Name=@n AND (Owner=@me OR Expires < @now) [+ insert if missing]`. Postgres uses upsert; SQLite uses transaction + select-for-update analogue. + +- `Scheduler/IJobMutex.cs` + `DatabaseJobMutex.cs` — `Task TryAcquireAsync(...)`, `Task ReleaseAsync(name, ct)`. Same primitive as lease but explicit release. + +- Hook into worker: after `IModuleJob.ExecuteAsync` completes (success **or** failure), if `entry.RecurringName` starts with `"schedule:"` and corresponds to a `ScheduledJobState` with `WithoutOverlapping=true`, release the mutex. Implement by introducing a `ScheduleSentinel` static helper and small additions in `JobProcessorService.ExecuteEntryAsync`. + +### Module wiring + +`BackgroundJobsModule.cs`: +- Always register `SchedulerRegistry` as singleton + `IScheduler`. +- Always register `IJobMutex` + `IInstanceLeader` as scoped. +- In Consumer mode also register `SchedulerService` as `IHostedService`. +- Add `BackgroundJobsWorkerOptions.SchedulerTickInterval` (default 30s) + `SchedulerLeaseTtl` (default 60s). + +## Phase 3 — Tests + +`modules/BackgroundJobs/tests/SimpleModule.BackgroundJobs.Tests/Scheduler/`: + +- `ScheduledJobBuilderTests.cs` — DSL produces correct cron strings & flags; invalid timezone / cron rejected. +- `SchedulerServiceTickTests.cs` — uses `TestDbContextFactory` + a fake `IClock` (introduce `TimeProvider`). Cases: + - Definition with NextRunAt past now → job enqueued. + - Definition with NextRunAt future → no enqueue. + - Bad cron in one definition → others still processed. + - WithoutOverlapping: simulate mutex held → enqueue skipped. + - OnOneServer: two ticks from two `ownerId`s → only one enqueues. +- `DatabaseJobMutexTests.cs` — concurrent TryAcquire only succeeds once until TTL or release. +- `DatabaseInstanceLeaderTests.cs` — same shape as mutex but no explicit release. +- `SchedulerReconciliationTests.cs` — definition added → row inserted; cron changed in code → row updated; existing LastRunAt preserved. + +## Phase 4 — CLI + +`cli/SimpleModule.Cli/Commands/Jobs/`: +- `JobsListScheduledCommand.cs` + `JobsListScheduledSettings.cs` (with optional `--connection`). +- Loads `appsettings.json` from solution context, opens a connection to the configured DB, reads `ScheduledJobStates` (use raw ADO.NET to avoid pulling the whole EF graph into the CLI), pretty-prints via Spectre `Table` with columns: Name, Type, Cron, TZ, Next, Last, Enabled, Flags (Mutex/Single). +- Register branch `jobs` in `Program.cs`. + +## Phase 5 — Docs + +- `docs/scheduler.md` — overview, DSL reference table, semantics of `WithoutOverlapping`/`OnOneServer`, recommended tick interval, idempotency notes, sample registration in a module's `ConfigureServices`. +- Link from `docs/CONSTITUTION.md` if scheduler imposes any new rule (it doesn't — just a feature). + +## Verification + +- `dotnet build` clean (TreatWarningsAsErrors). +- `dotnet test --filter "FullyQualifiedName~BackgroundJobs"`. +- `npm run check` (formatting/linting unaffected). +- Manual smoke: register a 1-minute schedule in `SimpleModule.Host`, run host, observe job execution. +- Open PR closing #159. + +## Out of scope (for this PR) + +- A view-endpoint UI for browsing schedules (the existing `/admin/jobs/recurring` page covers the legacy path; a separate page for declarative schedules can come later). +- Distributed mutex with row-level locking on Postgres — initial implementation uses the same compare-and-set pattern that works on both providers; can be optimised later if needed. + +## Review + +- [x] Contracts: `IScheduler` + `IScheduledJob` + fluent builder + `ScheduledJobDefinition` + `ScheduledJobDto` + `ScheduledJobState`/`JobMutex`/`JobLease` entities + `SchedulerRegistry` + `AddScheduledJobs` DI extension. +- [x] Impl: `SchedulerService` (BackgroundService) reconciling registry → state and enqueueing due jobs every 30 s. Per-definition failure isolation. `WithoutOverlapping` via `IJobMutex` (release hooked into `JobProcessorService`). `OnOneServer` via `IInstanceLeader`. +- [x] EF: 3 new entity configurations applied, indices on `IsEnabled/NextRunAt`. +- [x] Tests: 31 new tests across `Scheduler/`. Coverage: DSL → cron rendering, validation; database mutex contention + TTL takeover; database leader election + renewal; tick path enqueues due, skips future, isolates bad cron, honours `WithoutOverlapping`, honours `OnOneServer`. +- [x] CLI: `sm jobs list-scheduled` reads `ScheduledJobStates` via raw ADO.NET against Sqlite or Postgres, falls back to `appsettings.json` for connection. +- [x] Docs: `docs/scheduler.md` with quick-start, DSL reference, semantics, configuration. +- [x] Build: `dotnet build` clean (TreatWarningsAsErrors). Full test suite green: **1015 tests passing, 0 failing**.