Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pydantic import BaseModel, Field

from middleware import validate_api_key, track_usage, get_usage_stats, get_or_create_key, PLANS
from quality_scorer import score_structured_submission

app = FastAPI(
title="ContentSplit",
Expand Down Expand Up @@ -57,6 +58,23 @@ class RepurposeResponse(BaseModel):
created_at: str


class QualityScoreRequest(BaseModel):
submission: object = Field(..., description="Structured submission to score. Accepts JSON, markdown, code, or text.")
expected_format: Optional[str] = Field(default=None, description="Expected format: json, markdown, code, or text")
requirements: list[str] = Field(default=[], description="Rubric requirements or key concepts to cover")
required_sections: list[str] = Field(default=[], description="Required section names or headings")
pass_threshold: float = Field(default=0.70, ge=0, le=1)


class QualityScoreResponse(BaseModel):
weighted_score: float
quality_rating: str
scores: dict
feedback: list[str]
pass_threshold: bool
detected_format: str


# ── Content Generation (using prompts, model-agnostic) ────────────────────

PLATFORM_PROMPTS = {
Expand Down Expand Up @@ -376,6 +394,19 @@ async def repurpose_content(req: RepurposeRequest, user: dict = Depends(validate
)


@app.post("/api/quality-score", response_model=QualityScoreResponse)
async def quality_score(req: QualityScoreRequest, user: dict = Depends(validate_api_key)):
"""Score a structured submission against a deterministic quality rubric."""
track_usage(user.get("key", "anonymous"))
return score_structured_submission(
req.submission,
expected_format=req.expected_format,
requirements=req.requirements,
required_sections=req.required_sections,
pass_threshold=req.pass_threshold,
)


@app.get("/api/platforms")
async def list_platforms():
"""List available target platforms."""
Expand Down
288 changes: 288 additions & 0 deletions quality_scorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
"""
Structured submission quality scoring.

This module intentionally uses deterministic heuristics instead of external
models so scorecards are fast, repeatable, and available in offline/test runs.
"""

import ast
import json
import math
import re
from dataclasses import dataclass, field
from typing import Any


WEIGHTS = {
"completeness": 0.30,
"format_compliance": 0.20,
"coverage": 0.25,
"clarity": 0.15,
"validity": 0.10,
}

REQUIREMENT_STOPWORDS = {
"add",
"cover",
"include",
"mention",
"provide",
"return",
"show",
"support",
"with",
}

QUALITY_RATINGS = (
(0.85, "excellent"),
(0.70, "good"),
(0.50, "fair"),
(0.00, "poor"),
)


@dataclass
class ScoreConfig:
expected_format: str | None = None
requirements: list[str] = field(default_factory=list)
required_sections: list[str] = field(default_factory=list)
pass_threshold: float = 0.70


def normalize_submission(submission: Any) -> tuple[str, str, Any | None]:
"""Return text, detected format, and parsed value when available."""
if isinstance(submission, (dict, list)):
return json.dumps(submission, ensure_ascii=False), "json", submission

text = str(submission or "").strip()
if not text:
return "", "text", None

try:
parsed = json.loads(text)
if isinstance(parsed, (dict, list)):
return text, "json", parsed
except json.JSONDecodeError:
pass

if _looks_like_markdown(text):
return text, "markdown", None
if _looks_like_code(text):
return text, "code", None
return text, "text", None


def score_structured_submission(
submission: Any,
*,
expected_format: str | None = None,
requirements: list[str] | None = None,
required_sections: list[str] | None = None,
pass_threshold: float = 0.70,
) -> dict[str, Any]:
"""Score a structured submission against the five bounty dimensions."""
config = ScoreConfig(
expected_format=expected_format.lower() if expected_format else None,
requirements=[r for r in (requirements or []) if r],
required_sections=[s for s in (required_sections or []) if s],
pass_threshold=pass_threshold,
)
text, detected_format, parsed = normalize_submission(submission)

scores = {
"completeness": _score_completeness(text, parsed, config),
"format_compliance": _score_format_compliance(text, detected_format, config),
"coverage": _score_coverage(text, config),
"clarity": _score_clarity(text, detected_format, parsed),
"validity": _score_validity(text, detected_format, parsed),
}

weighted_score = round(
sum(scores[dimension] * WEIGHTS[dimension] for dimension in WEIGHTS),
4,
)

return {
"weighted_score": weighted_score,
"quality_rating": _quality_rating(weighted_score),
"scores": {key: round(value, 4) for key, value in scores.items()},
"feedback": _build_feedback(scores, detected_format, config),
"pass_threshold": weighted_score >= config.pass_threshold,
"detected_format": detected_format,
}


def _score_completeness(text: str, parsed: Any | None, config: ScoreConfig) -> float:
if not text:
return 0.0

length_score = min(len(text) / 1200, 1.0)
structure_score = 0.55
if isinstance(parsed, dict):
filled = sum(1 for value in parsed.values() if value not in (None, "", [], {}))
structure_score = filled / max(len(parsed), 1)
elif isinstance(parsed, list):
structure_score = min(len(parsed) / 5, 1.0)
elif _looks_like_markdown(text):
structure_score = min((text.count("\n#") + text.count("\n- ") + text.count("\n* ")) / 8, 1.0)
elif _looks_like_code(text):
structure_score = 0.75 if len(text.splitlines()) >= 3 else 0.45

section_score = _required_section_score(text, config.required_sections)
return _clamp((length_score * 0.35) + (structure_score * 0.35) + (section_score * 0.30))


def _score_format_compliance(text: str, detected_format: str, config: ScoreConfig) -> float:
if not text:
return 0.0
if not config.expected_format:
return 1.0 if detected_format in {"json", "markdown", "code", "text"} else 0.6
if config.expected_format == detected_format:
return 1.0
if config.expected_format == "text" and detected_format in {"markdown", "code"}:
return 0.75
if config.expected_format == "markdown" and detected_format == "text":
return 0.45
return 0.2


def _score_coverage(text: str, config: ScoreConfig) -> float:
if not text:
return 0.0

terms = _important_terms(config.requirements)
if terms:
normalized = text.lower()
covered = sum(1 for term in terms if term in normalized)
return _clamp(covered / len(terms))

words = re.findall(r"[A-Za-z0-9_'-]{3,}", text.lower())
if not words:
return 0.0
unique_ratio = len(set(words)) / len(words)
detail_score = min(len(words) / 180, 1.0)
return _clamp((unique_ratio * 0.45) + (detail_score * 0.55))


def _score_clarity(text: str, detected_format: str = "text", parsed: Any | None = None) -> float:
if not text:
return 0.0

if detected_format == "json":
if isinstance(parsed, dict):
named_fields = sum(1 for key in parsed if str(key).strip())
filled_fields = sum(1 for value in parsed.values() if value not in (None, "", [], {}))
return _clamp(0.55 + (named_fields / max(len(parsed), 1) * 0.20) + (filled_fields / max(len(parsed), 1) * 0.25))
if isinstance(parsed, list):
return _clamp(0.65 + min(len(parsed) / 10, 0.30))

sentences = [s for s in re.split(r"[.!?]\s+", text) if s.strip()]
words = re.findall(r"[A-Za-z0-9_'-]+", text)
avg_sentence = len(words) / max(len(sentences), 1)
sentence_score = 1.0 - min(abs(avg_sentence - 18) / 35, 0.65)
structure_bonus = 0.15 if re.search(r"(^|\n)(#{1,6}\s|\s*[-*]\s|\d+\.\s)", text) else 0.0
repetition_penalty = _repetition_penalty(words)
return _clamp(sentence_score + structure_bonus - repetition_penalty)


def _score_validity(text: str, detected_format: str, parsed: Any | None) -> float:
if not text:
return 0.0
if _contains_placeholders(text):
return 0.45
if detected_format == "json":
return 1.0 if parsed is not None else 0.25
if detected_format == "code":
try:
ast.parse(text)
return 1.0
except SyntaxError:
return 0.65 if _balanced_delimiters(text) else 0.35
if detected_format == "markdown":
return 0.85 if _balanced_markdown(text) else 0.55
return 0.80 if len(text.split()) >= 5 else 0.45


def _build_feedback(scores: dict[str, float], detected_format: str, config: ScoreConfig) -> list[str]:
feedback = [f"Detected format: {detected_format}."]
if config.expected_format and config.expected_format != detected_format:
feedback.append(f"Expected {config.expected_format}, but detected {detected_format}.")
for dimension, score in sorted(scores.items(), key=lambda item: item[1]):
if score < 0.60:
feedback.append(_dimension_feedback(dimension))
if len(feedback) == 1:
feedback.append("Submission is well-structured and satisfies the rubric.")
return feedback


def _dimension_feedback(dimension: str) -> str:
messages = {
"completeness": "Add more required details or fill empty fields.",
"format_compliance": "Match the requested output format more closely.",
"coverage": "Address more rubric requirements and key concepts.",
"clarity": "Use clearer sentence structure, headings, or bullets.",
"validity": "Remove placeholders and fix invalid syntax or structure.",
}
return messages[dimension]


def _quality_rating(score: float) -> str:
for threshold, label in QUALITY_RATINGS:
if score >= threshold:
return label
return "poor"


def _looks_like_markdown(text: str) -> bool:
return bool(re.search(r"(^|\n)(#{1,6}\s|[-*]\s|\d+\.\s|```)", text))


def _looks_like_code(text: str) -> bool:
return bool(re.search(r"\b(def|class|import|from|function|const|let|var)\b|[{};]", text))


def _required_section_score(text: str, sections: list[str]) -> float:
if not sections:
return 0.75
normalized = text.lower()
matched = sum(1 for section in sections if section.lower() in normalized)
return matched / len(sections)


def _important_terms(requirements: list[str]) -> list[str]:
terms: list[str] = []
for requirement in requirements:
words = re.findall(r"[A-Za-z0-9_'-]{4,}", requirement.lower())
terms.extend(word for word in words if word not in REQUIREMENT_STOPWORDS)
return sorted(set(terms))


def _repetition_penalty(words: list[str]) -> float:
if len(words) < 20:
return 0.0
counts: dict[str, int] = {}
for word in words:
lowered = word.lower()
if len(lowered) > 3:
counts[lowered] = counts.get(lowered, 0) + 1
repeated = sum(max(0, count - 2) for count in counts.values())
return min(repeated / max(len(words), 1), 0.25)


def _contains_placeholders(text: str) -> bool:
return bool(re.search(r"\b(todo|tbd|lorem ipsum|placeholder|your text here)\b", text, re.I))


def _balanced_markdown(text: str) -> bool:
return text.count("```") % 2 == 0


def _balanced_delimiters(text: str) -> bool:
pairs = [("(", ")"), ("[", "]"), ("{", "}")]
return all(text.count(opening) == text.count(closing) for opening, closing in pairs)


def _clamp(value: float) -> float:
if math.isnan(value):
return 0.0
return max(0.0, min(1.0, value))
46 changes: 46 additions & 0 deletions sample_scorecards.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[
{
"name": "complete-json-submission",
"scorecard": {
"weighted_score": 0.7753,
"quality_rating": "good",
"scores": {
"completeness": 0.7273,
"format_compliance": 1.0,
"coverage": 0.4286,
"clarity": 1.0,
"validity": 1.0
},
"feedback": [
"Detected format: json.",
"Address more rubric requirements and key concepts."
],
"pass_threshold": true,
"detected_format": "json"
}
},
{
"name": "placeholder-text-submission",
"scorecard": {
"weighted_score": 0.3022,
"quality_rating": "poor",
"scores": {
"completeness": 0.4239,
"format_compliance": 0.2,
"coverage": 0.0,
"clarity": 0.6,
"validity": 0.45
},
"feedback": [
"Detected format: text.",
"Expected json, but detected text.",
"Address more rubric requirements and key concepts.",
"Match the requested output format more closely.",
"Add more required details or fill empty fields.",
"Remove placeholders and fix invalid syntax or structure."
],
"pass_threshold": false,
"detected_format": "text"
}
}
]