Skip to content

feat: Add Entity Aggregations#570

Draft
SandeepTuniki wants to merge 1 commit into
masterfrom
entity-aggregations-port
Draft

feat: Add Entity Aggregations#570
SandeepTuniki wants to merge 1 commit into
masterfrom
entity-aggregations-port

Conversation

@SandeepTuniki

@SandeepTuniki SandeepTuniki commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Work in Progress. Do not review.

- Created EntityAggregationGenerator to replicate C++ Flume entity aggregation logic using BigQuery Federation and Spanner EXPORT DATA.
- Implemented MCF constraint parsing, multi-statement SQL pipeline, deterministic SV hashing, and dynamic SV naming.
- Added thorough E2E integration tests in aggregation_e2e_test.py covering ranges, wildcards, default dates, and multi-constraints in both Base and Custom DC modes.
- Relocated test classes to the middle of the file to prevent git merge conflicts.
@codacy-production

Copy link
Copy Markdown

Not up to standards ⛔

🔴 Issues 4 high · 22 medium · 18 minor

Alerts:
⚠ 44 issues (≤ 0 issues of at least minor severity)

Results:
44 new issues

Category Results
UnusedCode 1 medium
4 minor
Documentation 4 minor
ErrorProne 4 high
Security 15 medium
CodeStyle 10 minor
Complexity 6 medium

View in Codacy

🟢 Metrics 81 complexity

Metric Results
Complexity 81

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the EntityAggregationGenerator and EntityAggregationConfig classes to perform entity aggregations using BigQuery Federation, along with comprehensive integration tests. The review feedback highlights several critical security and robustness improvements for the SQL generation logic. Specifically, it recommends using safe, index-based column names instead of raw MCF-style property names to prevent SQL syntax errors and injection vulnerabilities, escaping dynamic properties like location and date, removing dead code blocks, and adding defensive validation for configuration inputs.

Comment on lines +69 to +119
def _parse_constraints(self, constraints: List[str]) -> List[Dict]:
"""Parses MCF-style constraints into structured metadata and SQL clauses."""
parsed = []
for c in constraints:
if not c or ':' not in c:
continue
prop, val_str = [part.strip() for part in c.split(':', 1)]

# 1. Wildcard constraint
if val_str == '*':
parsed.append({
'prop': prop,
'is_wildcard': True,
'sql_filter': None,
'val_str': val_str
})
continue

# 2. Range constraint [Low - Unit] (e.g., [7 - M])
range_low_match = re.match(r'^\[\s*([\d\.]+)\s*-\s*(\w+)\s*\]$', val_str)
if range_low_match:
low = range_low_match.group(1)
parsed.append({
'prop': prop,
'is_wildcard': False,
'sql_filter': f"SAFE_CAST({prop}_val AS FLOAT64) >= {low}",
'val_str': val_str
})
continue

# 3. Range constraint [Low High Unit] (e.g., [3 4 M])
range_bounds_match = re.match(r'^\[\s*([\d\.]+)\s+([\d\.]+)\s+(\w+)\s*\]$', val_str)
if range_bounds_match:
low = range_bounds_match.group(1)
high = range_bounds_match.group(2)
parsed.append({
'prop': prop,
'is_wildcard': False,
'sql_filter': f"SAFE_CAST({prop}_val AS FLOAT64) >= {low} AND SAFE_CAST({prop}_val AS FLOAT64) <= {high}",
'val_str': val_str
})
continue

# 4. Simple literal value constraint (e.g., magnitudeType: MagnitudeMl)
parsed.append({
'prop': prop,
'is_wildcard': False,
'sql_filter': f"{prop}_val = '{_escape_sql_literal(val_str)}'",
'val_str': val_str
})
return parsed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

MCF-style property names (such as magnitude-type or properties containing special characters) are not always valid unquoted SQL identifiers. Using them directly as column names or temporary table names can lead to SQL syntax errors or potential SQL injection vulnerabilities.

We should use safe, index-based column names (e.g., val_0, val_1) for the temporary tables and query columns, while keeping the original property name for the Spanner predicate matching (escaped using _escape_sql_literal).

    def _parse_constraints(self, constraints: List[str]) -> List[Dict]:
        """Parses MCF-style constraints into structured metadata and SQL clauses."""
        parsed = []
        for i, c in enumerate(constraints):
            if not c or ':' not in c:
                continue
            prop, val_str = [part.strip() for part in c.split(':', 1)]
            col_name = f"val_{i}"
            
            # 1. Wildcard constraint
            if val_str == '*':
                parsed.append({
                    'prop': prop,
                    'col_name': col_name,
                    'is_wildcard': True,
                    'sql_filter': None,
                    'val_str': val_str
                })
                continue

            # 2. Range constraint [Low - Unit] (e.g., [7 - M])
            range_low_match = re.match(r'^\[\s*([\d\.]+)\s*-\s*(\w+)\s*\]$', val_str)
            if range_low_match:
                low = range_low_match.group(1)
                parsed.append({
                    'prop': prop,
                    'col_name': col_name,
                    'is_wildcard': False,
                    'sql_filter': f"SAFE_CAST({col_name} AS FLOAT64) >= {low}",
                    'val_str': val_str
                })
                continue

            # 3. Range constraint [Low High Unit] (e.g., [3 4 M])
            range_bounds_match = re.match(r'^\[\s*([\d\.]+)\s+([\d\.]+)\s+(\w+)\s*\]$', val_str)
            if range_bounds_match:
                low = range_bounds_match.group(1)
                high = range_bounds_match.group(2)
                parsed.append({
                    'prop': prop,
                    'col_name': col_name,
                    'is_wildcard': False,
                    'sql_filter': f"SAFE_CAST({col_name} AS FLOAT64) >= {low} AND SAFE_CAST({col_name} AS FLOAT64) <= {high}",
                    'val_str': val_str
                })
                continue

            # 4. Simple literal value constraint (e.g., magnitudeType: MagnitudeMl)
            parsed.append({
                'prop': prop,
                'col_name': col_name,
                'is_wildcard': False,
                'sql_filter': f"{col_name} = '{_escape_sql_literal(val_str)}'",
                'val_str': val_str
            })
        return parsed

Comment on lines +150 to +180
-- Extract locations (using the first location property)
CREATE OR REPLACE TEMPORARY TABLE `temp_locations` AS
SELECT subject_id AS entity_id, object_id AS location_id
FROM EXTERNAL_QUERY("{connection_id}",
'''SELECT subject_id, object_id FROM Edge
WHERE predicate = "{config.location_props[0]}"''')
WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
""")

if config.date_prop:
sql_parts.append(f"""
-- Extract dates
CREATE OR REPLACE TEMPORARY TABLE `temp_dates` AS
SELECT subject_id AS entity_id, object_id AS raw_date
FROM EXTERNAL_QUERY("{connection_id}",
'''SELECT subject_id, object_id FROM Edge
WHERE predicate = "{config.date_prop}"''')
WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
""")

# Extract constraint values
for c in parsed_constraints:
sql_parts.append(f"""
-- Extract constraint: {c['prop']}
CREATE OR REPLACE TEMPORARY TABLE `temp_constraint_{c['prop']}` AS
SELECT subject_id AS entity_id, object_id AS {c['prop']}_val
FROM EXTERNAL_QUERY("{connection_id}",
'''SELECT subject_id, object_id FROM Edge
WHERE predicate = "{c['prop']}"''')
WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
""")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Escape config.location_props[0] and config.date_prop to prevent SQL syntax errors or SQL injection inside the Spanner query. Also, use the safe index-based table and column names for constraints.

        -- Extract locations (using the first location property)
        CREATE OR REPLACE TEMPORARY TABLE `temp_locations` AS
        SELECT subject_id AS entity_id, object_id AS location_id
        FROM EXTERNAL_QUERY("{connection_id}",
          '''SELECT subject_id, object_id FROM Edge 
             WHERE predicate = "{_escape_sql_literal(config.location_props[0])}"''')
        WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
        """)

        if config.date_prop:
            sql_parts.append(f"""
            -- Extract dates
            CREATE OR REPLACE TEMPORARY TABLE `temp_dates` AS
            SELECT subject_id AS entity_id, object_id AS raw_date
            FROM EXTERNAL_QUERY("{connection_id}",
              '''SELECT subject_id, object_id FROM Edge 
                 WHERE predicate = "{_escape_sql_literal(config.date_prop)}"''')
            WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
            """)

        # Extract constraint values
        for i, c in enumerate(parsed_constraints):
            sql_parts.append(f"""
            -- Extract constraint: {c['prop']}
            CREATE OR REPLACE TEMPORARY TABLE `temp_constraint_{i}` AS
            SELECT subject_id AS entity_id, object_id AS {c['col_name']}
            FROM EXTERNAL_QUERY("{connection_id}",
              '''SELECT subject_id, object_id FROM Edge 
                 WHERE predicate = "{_escape_sql_literal(c['prop'])}"''')
            WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
            """)

Comment on lines +198 to +211
# Constraint selects and joins
cte_cons_selects = ", ".join([f"c_{c['prop']}.{c['prop']}_val" for c in parsed_constraints])
if cte_cons_selects:
cte_cons_selects = ", " + cte_cons_selects

outer_cons_selects = ", ".join([f"{c['prop']}_val" for c in parsed_constraints])
if outer_cons_selects:
outer_cons_selects = ", " + outer_cons_selects

cons_joins = "\n".join([f"JOIN `temp_constraint_{c['prop']}` c_{c['prop']} ON e.entity_id = c_{c['prop']}.entity_id" for c in parsed_constraints])

# Constraint filters (only for non-wildcard constraints)
filters = [c['sql_filter'] for c in parsed_constraints if c['sql_filter']]
filter_clause = "AND " + " AND ".join(filters) if filters else ""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update the constraint selects, joins, and filters to use the safe index-based column and table names.

Suggested change
# Constraint selects and joins
cte_cons_selects = ", ".join([f"c_{c['prop']}.{c['prop']}_val" for c in parsed_constraints])
if cte_cons_selects:
cte_cons_selects = ", " + cte_cons_selects
outer_cons_selects = ", ".join([f"{c['prop']}_val" for c in parsed_constraints])
if outer_cons_selects:
outer_cons_selects = ", " + outer_cons_selects
cons_joins = "\n".join([f"JOIN `temp_constraint_{c['prop']}` c_{c['prop']} ON e.entity_id = c_{c['prop']}.entity_id" for c in parsed_constraints])
# Constraint filters (only for non-wildcard constraints)
filters = [c['sql_filter'] for c in parsed_constraints if c['sql_filter']]
filter_clause = "AND " + " AND ".join(filters) if filters else ""
# Constraint selects and joins
cte_cons_selects = ", ".join([f"c_{i}.{c['col_name']}" for i, c in enumerate(parsed_constraints)])
if cte_cons_selects:
cte_cons_selects = ", " + cte_cons_selects
outer_cons_selects = ", ".join([f"{c['col_name']}" for c in parsed_constraints])
if outer_cons_selects:
outer_cons_selects = ", " + outer_cons_selects
cons_joins = "\n".join([f"JOIN `temp_constraint_{i}` c_{i} ON e.entity_id = c_{i}.entity_id" for i, c in enumerate(parsed_constraints)])
# Constraint filters (only for non-wildcard constraints)
filters = [c['sql_filter'] for c in parsed_constraints if c['sql_filter']]
filter_clause = "AND " + " AND ".join(filters) if filters else ""

Comment on lines +243 to +250
wildcard_cols = [f"{c['prop']}_val" for c in parsed_constraints if c['is_wildcard']]
wildcard_select = ", ".join(wildcard_cols)
if wildcard_select:
wildcard_select = ", " + wildcard_select

group_by_cols = f"location_id, date_{fmt_clean}, period_{fmt_clean}"
if wildcard_cols:
group_by_cols += ", " + ", ".join(wildcard_cols)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update the wildcard columns and group by columns to use the safe index-based column names.

Suggested change
wildcard_cols = [f"{c['prop']}_val" for c in parsed_constraints if c['is_wildcard']]
wildcard_select = ", ".join(wildcard_cols)
if wildcard_select:
wildcard_select = ", " + wildcard_select
group_by_cols = f"location_id, date_{fmt_clean}, period_{fmt_clean}"
if wildcard_cols:
group_by_cols += ", " + ", ".join(wildcard_cols)
wildcard_cols = [c['col_name'] for c in parsed_constraints if c['is_wildcard']]
wildcard_select = ", ".join(wildcard_cols)
if wildcard_select:
wildcard_select = ", " + wildcard_select
group_by_cols = f"location_id, date_{fmt_clean}, period_{fmt_clean}"
if wildcard_cols:
group_by_cols += ", " + ", ".join(wildcard_cols)

Comment on lines +293 to +340
concat_args = []
for key in sorted_keys:
if key in sv_props:
# Static property
concat_args.append(f"'{key}={sv_props[key]}'")
else:
# Wildcard property (dynamic from column)
concat_args.append(f"'{key}='")
concat_args.append(f"COALESCE({key}_val, '')")

# Join with comma
# To do this in SQL CONCAT, we need to interleave commas between the key=value pairs.
# e.g., CONCAT('key1=val1', ',', 'key2=', col2, ',', 'key3=val3')
sql_concat_args = []
for i, arg in enumerate(concat_args):
if i > 0 and not (i % 2 == 1 and concat_args[i-1].startswith("'") and not concat_args[i-1].endswith("=")):
# Add comma before next pair (only if we are not appending the value to its key)
# Wait, the logic is: we want commas BETWEEN pairs.
# A pair is either a single static string `'key=val'` or a split `'key='`, `col`.
# Let's simplify: build a list of "pair expressions" in SQL, then join them with CONCAT using a separator,
# or just construct the flat list of arguments for CONCAT in Python.
pass

# Let's construct the flat list of arguments for CONCAT in Python:
flat_args = []
for i, key in enumerate(sorted_keys):
if i > 0:
flat_args.append("','") # Add comma separator between pairs
if key in sv_props:
flat_args.append(f"'{key}={sv_props[key]}'")
else:
flat_args.append(f"'{key}='")
flat_args.append(f"COALESCE({key}_val, '')")

concat_expr = f"CONCAT({', '.join(flat_args)})"

# Now we can generate the SV DCID dynamically in a temp table
# We also generate the SV Name
name_parts = [f"Count of {config.entity_types[0]}"]
for c in parsed_constraints:
if not c['is_wildcard']:
name_parts.append(f"{c['prop']}={c['val_str']}")
else:
# For wildcards, we will append the dynamic value in SQL
# we can handle name generation in SQL if needed, but a simple name is fine,
# or we can just use the DCID as name if it's too complex.
# Let's construct a decent name in SQL.
pass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Remove the dead code blocks (sql_concat_args and name_parts loops with pass statements) and update the deterministic SV DCID generation to use the safe index-based column names.

Suggested change
concat_args = []
for key in sorted_keys:
if key in sv_props:
# Static property
concat_args.append(f"'{key}={sv_props[key]}'")
else:
# Wildcard property (dynamic from column)
concat_args.append(f"'{key}='")
concat_args.append(f"COALESCE({key}_val, '')")
# Join with comma
# To do this in SQL CONCAT, we need to interleave commas between the key=value pairs.
# e.g., CONCAT('key1=val1', ',', 'key2=', col2, ',', 'key3=val3')
sql_concat_args = []
for i, arg in enumerate(concat_args):
if i > 0 and not (i % 2 == 1 and concat_args[i-1].startswith("'") and not concat_args[i-1].endswith("=")):
# Add comma before next pair (only if we are not appending the value to its key)
# Wait, the logic is: we want commas BETWEEN pairs.
# A pair is either a single static string `'key=val'` or a split `'key='`, `col`.
# Let's simplify: build a list of "pair expressions" in SQL, then join them with CONCAT using a separator,
# or just construct the flat list of arguments for CONCAT in Python.
pass
# Let's construct the flat list of arguments for CONCAT in Python:
flat_args = []
for i, key in enumerate(sorted_keys):
if i > 0:
flat_args.append("','") # Add comma separator between pairs
if key in sv_props:
flat_args.append(f"'{key}={sv_props[key]}'")
else:
flat_args.append(f"'{key}='")
flat_args.append(f"COALESCE({key}_val, '')")
concat_expr = f"CONCAT({', '.join(flat_args)})"
# Now we can generate the SV DCID dynamically in a temp table
# We also generate the SV Name
name_parts = [f"Count of {config.entity_types[0]}"]
for c in parsed_constraints:
if not c['is_wildcard']:
name_parts.append(f"{c['prop']}={c['val_str']}")
else:
# For wildcards, we will append the dynamic value in SQL
# we can handle name generation in SQL if needed, but a simple name is fine,
# or we can just use the DCID as name if it's too complex.
# Let's construct a decent name in SQL.
pass
# Let's construct the flat list of arguments for CONCAT in Python:
flat_args = []
for i, key in enumerate(sorted_keys):
if i > 0:
flat_args.append("','") # Add comma separator between pairs
if key in sv_props:
flat_args.append(f"'{key}={sv_props[key]}'")
else:
c = next(c for c in parsed_constraints if c['prop'] == key)
flat_args.append(f"'{key}='")
flat_args.append(f"COALESCE({c['col_name']}, '')")
concat_expr = f"CONCAT({', '.join(flat_args)})"

Comment on lines +348 to +352
# Add wildcards to name
wildcard_constraints = [c for c in parsed_constraints if c['is_wildcard']]
for c in wildcard_constraints:
name_flat_args.append(f"', {c['prop']}='")
name_flat_args.append(f"COALESCE({c['prop']}_val, 'unknown')")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update the wildcard name generation to use the safe index-based column names.

Suggested change
# Add wildcards to name
wildcard_constraints = [c for c in parsed_constraints if c['is_wildcard']]
for c in wildcard_constraints:
name_flat_args.append(f"', {c['prop']}='")
name_flat_args.append(f"COALESCE({c['prop']}_val, 'unknown')")
# Add wildcards to name
wildcard_constraints = [c for c in parsed_constraints if c['is_wildcard']]
for c in wildcard_constraints:
name_flat_args.append(f"', {c['prop']}='")
name_flat_args.append(f"COALESCE({c['col_name']}, 'unknown')")

Comment on lines +394 to +396
# Wildcard props
for c in wildcard_constraints:
edge_selects.append(f"SELECT DISTINCT sv_dcid AS subject_id, '{c['prop']}' AS predicate, {c['prop']}_val AS object_id, '{output_provenance}' AS provenance FROM `temp_aggregated_with_sv` WHERE {c['prop']}_val IS NOT NULL")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update the wildcard edge export query to use the safe index-based column names and escape the predicate.

Suggested change
# Wildcard props
for c in wildcard_constraints:
edge_selects.append(f"SELECT DISTINCT sv_dcid AS subject_id, '{c['prop']}' AS predicate, {c['prop']}_val AS object_id, '{output_provenance}' AS provenance FROM `temp_aggregated_with_sv` WHERE {c['prop']}_val IS NOT NULL")
# Wildcard props
for c in wildcard_constraints:
edge_selects.append(f"SELECT DISTINCT sv_dcid AS subject_id, '{_escape_sql_literal(c['prop'])}' AS predicate, {c['col_name']} AS object_id, '{output_provenance}' AS provenance FROM `temp_aggregated_with_sv` WHERE {c['col_name']} IS NOT NULL")

Comment on lines +121 to +136
def _generate_sql(self, config: EntityAggregationConfig) -> str:
"""Generates the multi-statement SQL script for a single aggregation config."""
connection_id = self.executor.connection_id
dest = self.executor.get_spanner_destination_uri()

prefix = "dc/base/" if self.is_base_dc else ""
output_provenance = f"{prefix}{config.output_import}"

safe_input_imports = [_escape_sql_literal(name) for name in config.input_imports]
input_provenances = [f"'{prefix}{name}'" for name in safe_input_imports]
input_provenances_str = ", ".join(input_provenances)

entity_types_str = ", ".join([f"'{_escape_sql_literal(t)}'" for t in config.entity_types])

# Parse constraints
parsed_constraints = self._parse_constraints(config.constraints)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If any of the configuration lists (entity_types, location_props, input_imports, or agg_date_formats) are empty, the generated SQL query will contain syntax errors (such as IN () or trailing commas). We should add defensive validation at the beginning of _generate_sql to raise a ValueError if these lists are empty.

    def _generate_sql(self, config: EntityAggregationConfig) -> str:
        """Generates the multi-statement SQL script for a single aggregation config."""
        if not config.entity_types:
            raise ValueError("entity_types cannot be empty")
        if not config.location_props:
            raise ValueError("location_props cannot be empty")
        if not config.input_imports:
            raise ValueError("input_imports cannot be empty")
        if not config.agg_date_formats:
            raise ValueError("agg_date_formats cannot be empty")

        connection_id = self.executor.connection_id
        dest = self.executor.get_spanner_destination_uri()
        
        prefix = "dc/base/" if self.is_base_dc else ""
        output_provenance = f"{prefix}{config.output_import}"
        
        safe_input_imports = [_escape_sql_literal(name) for name in config.input_imports]
        input_provenances = [f"'{prefix}{name}'" for name in safe_input_imports]
        input_provenances_str = ", ".join(input_provenances)

        entity_types_str = ", ".join([f"'{_escape_sql_literal(t)}'" for t in config.entity_types])
        
        # Parse constraints
        parsed_constraints = self._parse_constraints(config.constraints)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant