Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="4.14.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="5.0.0" />
<!-- EF Core -->
<PackageVersion Include="Microsoft.Data.Sqlite" Version="10.0.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="10.0.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Sqlite" Version="10.0.3" />
<PackageVersion Include="Npgsql" Version="9.0.4" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="10.0.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.InMemory" Version="10.0.3" />
<PackageVersion Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="10.0.0" />
Expand Down
184 changes: 184 additions & 0 deletions cli/SimpleModule.Cli/Commands/Jobs/JobsListScheduledCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
using System.Data.Common;
using System.Globalization;
using System.Text.Json;
using Microsoft.Data.Sqlite;
using Npgsql;
using SimpleModule.Cli.Infrastructure;
using Spectre.Console;
using Spectre.Console.Cli;

namespace SimpleModule.Cli.Commands.Jobs;

public sealed class JobsListScheduledCommand : Command<JobsListScheduledSettings>
{
public override int Execute(CommandContext context, JobsListScheduledSettings settings)
{
var (connection, provider) = ResolveConnection(settings);
if (connection is null)
{
AnsiConsole.MarkupLine(
"[red]No database connection. Pass --connection or run from a project with appsettings.json.[/]"
);
return 1;
}

try
{
using var conn = OpenConnection(provider, connection);
var rows = ReadSchedules(conn).ToList();
if (rows.Count == 0)
{
AnsiConsole.MarkupLine("[yellow]No scheduled jobs found.[/]");
return 0;
}

var table = new Table().RoundedBorder();
table.AddColumn("Name");
table.AddColumn("Job type");
table.AddColumn("Cron");
table.AddColumn("TZ");
table.AddColumn("Next run");
table.AddColumn("Last run");
table.AddColumn("Flags");

foreach (var row in rows.OrderBy(r => r.NextRunAt ?? DateTimeOffset.MaxValue))
{
var flags = new List<string>();
if (!row.IsEnabled)
flags.Add("[red]disabled[/]");
if (row.WithoutOverlapping)
flags.Add("mutex");
if (row.OnOneServer)
flags.Add("single");

table.AddRow(
Markup.Escape(row.Name),
Markup.Escape(ShortType(row.JobTypeName)),
Markup.Escape(row.CronExpression),
Markup.Escape(row.TimeZoneId),
FormatTimestamp(row.NextRunAt),
FormatTimestamp(row.LastRunAt),
string.Join(", ", flags)
);
}

AnsiConsole.Write(table);
AnsiConsole.MarkupLine($"\n[dim]{rows.Count} scheduled job(s)[/]");
return 0;
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
AnsiConsole.MarkupLine($"[red]Failed to read scheduled jobs:[/] {Markup.Escape(ex.Message)}");
return 1;
}
}

private static (string? Connection, string Provider) ResolveConnection(
JobsListScheduledSettings settings
)
{
if (!string.IsNullOrWhiteSpace(settings.ConnectionString))
{
return (settings.ConnectionString, settings.Provider ?? GuessProvider(settings.ConnectionString));
}

var solution = SolutionContext.Discover();
if (solution is null)
return (null, "Sqlite");

var path = Path.Combine(solution.RootPath, "template", "SimpleModule.Host", "appsettings.json");
if (!File.Exists(path))
{
var hostDir = Directory
.EnumerateFiles(solution.RootPath, "appsettings.json", SearchOption.AllDirectories)
.FirstOrDefault();
if (hostDir is null)
return (null, "Sqlite");
path = hostDir;
}

using var doc = JsonDocument.Parse(File.ReadAllText(path));
if (!doc.RootElement.TryGetProperty("Database", out var db))
return (null, "Sqlite");

var conn = db.TryGetProperty("DefaultConnection", out var cs) ? cs.GetString() : null;
var provider = db.TryGetProperty("Provider", out var p) ? p.GetString() : "Sqlite";
return (conn, provider ?? "Sqlite");
}

private static string GuessProvider(string connection) =>
connection.Contains("Data Source", StringComparison.OrdinalIgnoreCase) ? "Sqlite" : "Postgres";

private static DbConnection OpenConnection(string provider, string connectionString)
{
DbConnection conn = provider.Equals("Postgres", StringComparison.OrdinalIgnoreCase)
? new NpgsqlConnection(connectionString)
: new SqliteConnection(connectionString);
conn.Open();
return conn;
}

private static IEnumerable<ScheduleRow> ReadSchedules(DbConnection conn)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = """
SELECT "Name", "JobTypeName", "CronExpression", "TimeZoneId",
"IsEnabled", "WithoutOverlapping", "OnOneServer",
"LastRunAt", "NextRunAt"
FROM "ScheduledJobStates"
""";
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
yield return new ScheduleRow(
reader.GetString(0),
reader.GetString(1),
reader.GetString(2),
reader.GetString(3),
reader.GetBoolean(4),
reader.GetBoolean(5),
reader.GetBoolean(6),
ReadTimestamp(reader, 7),
ReadTimestamp(reader, 8)
);
}
}

private static DateTimeOffset? ReadTimestamp(DbDataReader reader, int ordinal)
{
if (reader.IsDBNull(ordinal))
return null;
var raw = reader.GetValue(ordinal);
return raw switch
{
DateTimeOffset dto => dto,
DateTime dt => new DateTimeOffset(DateTime.SpecifyKind(dt, DateTimeKind.Utc)),
string s when DateTimeOffset.TryParse(s, CultureInfo.InvariantCulture, out var parsed) => parsed,
_ => null,
};
}

private static string FormatTimestamp(DateTimeOffset? value) =>
value is null
? "[dim]—[/]"
: Markup.Escape(value.Value.ToString("yyyy-MM-dd HH:mm:ss 'UTC'", CultureInfo.InvariantCulture));

// Mirrors BackgroundJobsInternalConstants.GetShortTypeName — kept inline because
// the CLI doesn't reference the BackgroundJobs assembly.
private static string ShortType(string assemblyQualifiedName) =>
assemblyQualifiedName.Split(',')[0].Split('.').Last();

private sealed record ScheduleRow(
string Name,
string JobTypeName,
string CronExpression,
string TimeZoneId,
bool IsEnabled,
bool WithoutOverlapping,
bool OnOneServer,
DateTimeOffset? LastRunAt,
DateTimeOffset? NextRunAt
);
}
19 changes: 19 additions & 0 deletions cli/SimpleModule.Cli/Commands/Jobs/JobsListScheduledSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.ComponentModel;
using Spectre.Console.Cli;

namespace SimpleModule.Cli.Commands.Jobs;

public sealed class JobsListScheduledSettings : CommandSettings
{
[Description(
"Override the database connection string (e.g. \"Data Source=app.db\" or "
+ "\"Host=localhost;Database=app;Username=...;Password=...\"). "
+ "Defaults to Database:DefaultConnection from appsettings*.json."
)]
[CommandOption("-c|--connection <CONNECTION>")]
public string? ConnectionString { get; set; }

[Description("Database provider when --connection is supplied: Sqlite or Postgres.")]
[CommandOption("-p|--provider <PROVIDER>")]
public string? Provider { get; set; }
}
13 changes: 13 additions & 0 deletions cli/SimpleModule.Cli/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using SimpleModule.Cli.Commands.Dev;
using SimpleModule.Cli.Commands.Doctor;
using SimpleModule.Cli.Commands.Install;
using SimpleModule.Cli.Commands.Jobs;
using SimpleModule.Cli.Commands.List;
using SimpleModule.Cli.Commands.Maintenance;
using SimpleModule.Cli.Commands.New;
Expand Down Expand Up @@ -62,6 +63,18 @@
.AddCommand<DoctorCommand>("doctor")
.WithDescription("Validate project structure and conventions");

config.AddBranch(
"jobs",
jobsBranch =>
{
jobsBranch.SetDescription("Inspect background-job state");
jobsBranch
.AddCommand<JobsListScheduledCommand>("list-scheduled")
.WithDescription("List declarative scheduled jobs with next/last run times")
.WithExample("jobs", "list-scheduled");
}
);

config.AddBranch(
"skill",
skillBranch =>
Expand Down
2 changes: 2 additions & 0 deletions cli/SimpleModule.Cli/SimpleModule.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NetEscapades.EnumGenerators" />
<PackageReference Include="Microsoft.Data.Sqlite" />
<PackageReference Include="Npgsql" />
<PackageReference Include="Spectre.Console.Cli" />
</ItemGroup>
<ItemGroup>
Expand Down
140 changes: 140 additions & 0 deletions docs/scheduler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Task Scheduler

The `BackgroundJobs` module exposes a fluent, code-declared scheduler so any
module can register recurring work at startup. Definitions live in C#, are
reconciled to the database at every tick, and end up as ordinary
`JobQueueEntry` rows that the worker picks up like any other job.

The scheduler is **declarative** — it complements (but does not replace) the
runtime API on `IBackgroundJobs` for ad-hoc recurring jobs created from
endpoints.

## Quick start

In your module's `ConfigureServices`, call `AddScheduledJobs`:

```csharp
using SimpleModule.BackgroundJobs.Contracts;

public class AuditLogsModule : IModule
{
public void ConfigureServices(IServiceCollection services, IConfiguration configuration)
{
services.AddModuleJob<NightlyAuditPurge>();
services.AddModuleJob<DailyDigestEmail>();

services.AddScheduledJobs(scheduler =>
{
scheduler.Job<NightlyAuditPurge>()
.DailyAt("02:00")
.Timezone("UTC")
.WithoutOverlapping();

scheduler.Job<DailyDigestEmail>("daily-digest")
.Cron("0 8 * * MON-FRI")
.Timezone("America/New_York")
.OnOneServer();
});
}
}
```

Each `scheduler.Job<T>()` registers a single declarative schedule. The job type
must implement `IModuleJob` and be registered with `AddModuleJob<T>()` so the
worker can resolve it from the DI container.

## DSL reference

| Method | Cron produced | Notes |
|---|---|---|
| `Cron("0 8 * * MON-FRI")` | as given | Standard 5-field cron; 6-field with seconds also supported |
| `EveryMinutes(15)` | `*/15 * * * *` | 1–59 only |
| `Hourly()` | `0 * * * *` | Top of each hour |
| `Daily()` | `0 0 * * *` | Midnight |
| `DailyAt("13:45")` | `45 13 * * *` | 24h `HH:mm` |
| `Weekdays()` | `0 0 * * MON-FRI` | Midnight Mon–Fri |
| `Timezone("UTC")` | — | IANA timezone for cron evaluation (default `UTC`) |
| `WithoutOverlapping()` | — | Skip if the prior run is still in-flight |
| `OnOneServer()` | — | Single elected host enqueues when many run the scheduler |
| `WithPayload(obj)` | — | Serialised to JSON for `IJobExecutionContext.Data` |

## Semantics

### Reconciliation

Every tick (default 30s — configurable via `BackgroundJobs:Scheduler:TickInterval`)
the scheduler upserts each in-memory definition into the `ScheduledJobStates`
table. Changing cron/timezone in code is applied on the next tick; `LastRunAt`
and `NextRunAt` are preserved across reconciles unless the cron actually
changed.

A definition removed from code does **not** automatically delete its row — the
row keeps its `IsEnabled` value, the scheduler ignores rows whose definitions
are no longer in the registry, and the worker won't enqueue them again. Manually
truncate the table if you need to purge orphans.

### `WithoutOverlapping()`

Backed by a per-job mutex row in `JobMutexes`. Before enqueueing, the scheduler
calls `IJobMutex.TryAcquireAsync(name)`; if the mutex is held by an in-flight
run, the tick *skips* enqueueing (and advances `NextRunAt`). When the worker
finishes (success or failure), it releases the mutex so the next tick can
re-enqueue.

Mutex TTL defaults to 1 hour; configure via `BackgroundJobs:Scheduler:MutexTtl`.
A stuck worker can't block a schedule forever — the TTL provides the safety
net.

### `OnOneServer()`

Backed by a single `scheduler` lease row in `JobLeases`. On every tick a host
tries to acquire the lease; only the holder runs the rest of the tick. Lease
TTL defaults to 1 minute (≈2× tick interval); set via
`BackgroundJobs:Scheduler:LeaseTtl`.

If a definition has `OnOneServer()` *anywhere* in the registry, the lease is
required for that host's tick. Hosts without the lease skip silently.

### Failure isolation

A definition with a bad cron expression is logged at `Error` and the tick
continues with the rest. The bad definition's `NextRunAt` stays `null` and no
work is enqueued for it until the cron is fixed.

## Inspecting state

The host writes `ScheduledJobStates`, `JobMutexes`, and `JobLeases` to the
database configured by `Database:DefaultConnection`. From the CLI:

```bash
sm jobs list-scheduled # uses appsettings.json
sm jobs list-scheduled --connection "Data Source=app.db" --provider Sqlite
sm jobs list-scheduled --connection "Host=...;Database=..." --provider Postgres
```

The command prints Name, Job type, Cron, TZ, Next run, Last run, and flags
(`mutex`, `single`, `disabled`).

## Worker mode

The hosted `SchedulerService` is registered as `IHostedService` only when the
module is in `Consumer` mode (the same condition that registers
`JobProcessorService`). Producer-only hosts still register `IScheduler` so
`AddScheduledJobs` succeeds — but no ticks run there.

## Configuration

```jsonc
{
"BackgroundJobs": {
"Scheduler": {
"TickInterval": "00:00:30",
"LeaseTtl": "00:01:00",
"MutexTtl": "01:00:00"
}
}
}
```

Sensible defaults are baked in; override only when tuning for a specific
deployment.
Loading
Loading