feat: Add Entity Aggregations#570
Conversation
- Created EntityAggregationGenerator to replicate C++ Flume entity aggregation logic using BigQuery Federation and Spanner EXPORT DATA. - Implemented MCF constraint parsing, multi-statement SQL pipeline, deterministic SV hashing, and dynamic SV naming. - Added thorough E2E integration tests in aggregation_e2e_test.py covering ranges, wildcards, default dates, and multi-constraints in both Base and Custom DC modes. - Relocated test classes to the middle of the file to prevent git merge conflicts.
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| UnusedCode | 1 medium 4 minor |
| Documentation | 4 minor |
| ErrorProne | 4 high |
| Security | 15 medium |
| CodeStyle | 10 minor |
| Complexity | 6 medium |
🟢 Metrics 81 complexity
Metric Results Complexity 81
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.
There was a problem hiding this comment.
Code Review
This pull request introduces the EntityAggregationGenerator and EntityAggregationConfig classes to perform entity aggregations using BigQuery Federation, along with comprehensive integration tests. The review feedback highlights several critical security and robustness improvements for the SQL generation logic. Specifically, it recommends using safe, index-based column names instead of raw MCF-style property names to prevent SQL syntax errors and injection vulnerabilities, escaping dynamic properties like location and date, removing dead code blocks, and adding defensive validation for configuration inputs.
| def _parse_constraints(self, constraints: List[str]) -> List[Dict]: | ||
| """Parses MCF-style constraints into structured metadata and SQL clauses.""" | ||
| parsed = [] | ||
| for c in constraints: | ||
| if not c or ':' not in c: | ||
| continue | ||
| prop, val_str = [part.strip() for part in c.split(':', 1)] | ||
|
|
||
| # 1. Wildcard constraint | ||
| if val_str == '*': | ||
| parsed.append({ | ||
| 'prop': prop, | ||
| 'is_wildcard': True, | ||
| 'sql_filter': None, | ||
| 'val_str': val_str | ||
| }) | ||
| continue | ||
|
|
||
| # 2. Range constraint [Low - Unit] (e.g., [7 - M]) | ||
| range_low_match = re.match(r'^\[\s*([\d\.]+)\s*-\s*(\w+)\s*\]$', val_str) | ||
| if range_low_match: | ||
| low = range_low_match.group(1) | ||
| parsed.append({ | ||
| 'prop': prop, | ||
| 'is_wildcard': False, | ||
| 'sql_filter': f"SAFE_CAST({prop}_val AS FLOAT64) >= {low}", | ||
| 'val_str': val_str | ||
| }) | ||
| continue | ||
|
|
||
| # 3. Range constraint [Low High Unit] (e.g., [3 4 M]) | ||
| range_bounds_match = re.match(r'^\[\s*([\d\.]+)\s+([\d\.]+)\s+(\w+)\s*\]$', val_str) | ||
| if range_bounds_match: | ||
| low = range_bounds_match.group(1) | ||
| high = range_bounds_match.group(2) | ||
| parsed.append({ | ||
| 'prop': prop, | ||
| 'is_wildcard': False, | ||
| 'sql_filter': f"SAFE_CAST({prop}_val AS FLOAT64) >= {low} AND SAFE_CAST({prop}_val AS FLOAT64) <= {high}", | ||
| 'val_str': val_str | ||
| }) | ||
| continue | ||
|
|
||
| # 4. Simple literal value constraint (e.g., magnitudeType: MagnitudeMl) | ||
| parsed.append({ | ||
| 'prop': prop, | ||
| 'is_wildcard': False, | ||
| 'sql_filter': f"{prop}_val = '{_escape_sql_literal(val_str)}'", | ||
| 'val_str': val_str | ||
| }) | ||
| return parsed |
There was a problem hiding this comment.
MCF-style property names (such as magnitude-type or properties containing special characters) are not always valid unquoted SQL identifiers. Using them directly as column names or temporary table names can lead to SQL syntax errors or potential SQL injection vulnerabilities.
We should use safe, index-based column names (e.g., val_0, val_1) for the temporary tables and query columns, while keeping the original property name for the Spanner predicate matching (escaped using _escape_sql_literal).
def _parse_constraints(self, constraints: List[str]) -> List[Dict]:
"""Parses MCF-style constraints into structured metadata and SQL clauses."""
parsed = []
for i, c in enumerate(constraints):
if not c or ':' not in c:
continue
prop, val_str = [part.strip() for part in c.split(':', 1)]
col_name = f"val_{i}"
# 1. Wildcard constraint
if val_str == '*':
parsed.append({
'prop': prop,
'col_name': col_name,
'is_wildcard': True,
'sql_filter': None,
'val_str': val_str
})
continue
# 2. Range constraint [Low - Unit] (e.g., [7 - M])
range_low_match = re.match(r'^\[\s*([\d\.]+)\s*-\s*(\w+)\s*\]$', val_str)
if range_low_match:
low = range_low_match.group(1)
parsed.append({
'prop': prop,
'col_name': col_name,
'is_wildcard': False,
'sql_filter': f"SAFE_CAST({col_name} AS FLOAT64) >= {low}",
'val_str': val_str
})
continue
# 3. Range constraint [Low High Unit] (e.g., [3 4 M])
range_bounds_match = re.match(r'^\[\s*([\d\.]+)\s+([\d\.]+)\s+(\w+)\s*\]$', val_str)
if range_bounds_match:
low = range_bounds_match.group(1)
high = range_bounds_match.group(2)
parsed.append({
'prop': prop,
'col_name': col_name,
'is_wildcard': False,
'sql_filter': f"SAFE_CAST({col_name} AS FLOAT64) >= {low} AND SAFE_CAST({col_name} AS FLOAT64) <= {high}",
'val_str': val_str
})
continue
# 4. Simple literal value constraint (e.g., magnitudeType: MagnitudeMl)
parsed.append({
'prop': prop,
'col_name': col_name,
'is_wildcard': False,
'sql_filter': f"{col_name} = '{_escape_sql_literal(val_str)}'",
'val_str': val_str
})
return parsed| -- Extract locations (using the first location property) | ||
| CREATE OR REPLACE TEMPORARY TABLE `temp_locations` AS | ||
| SELECT subject_id AS entity_id, object_id AS location_id | ||
| FROM EXTERNAL_QUERY("{connection_id}", | ||
| '''SELECT subject_id, object_id FROM Edge | ||
| WHERE predicate = "{config.location_props[0]}"''') | ||
| WHERE subject_id IN (SELECT entity_id FROM `temp_entities`); | ||
| """) | ||
|
|
||
| if config.date_prop: | ||
| sql_parts.append(f""" | ||
| -- Extract dates | ||
| CREATE OR REPLACE TEMPORARY TABLE `temp_dates` AS | ||
| SELECT subject_id AS entity_id, object_id AS raw_date | ||
| FROM EXTERNAL_QUERY("{connection_id}", | ||
| '''SELECT subject_id, object_id FROM Edge | ||
| WHERE predicate = "{config.date_prop}"''') | ||
| WHERE subject_id IN (SELECT entity_id FROM `temp_entities`); | ||
| """) | ||
|
|
||
| # Extract constraint values | ||
| for c in parsed_constraints: | ||
| sql_parts.append(f""" | ||
| -- Extract constraint: {c['prop']} | ||
| CREATE OR REPLACE TEMPORARY TABLE `temp_constraint_{c['prop']}` AS | ||
| SELECT subject_id AS entity_id, object_id AS {c['prop']}_val | ||
| FROM EXTERNAL_QUERY("{connection_id}", | ||
| '''SELECT subject_id, object_id FROM Edge | ||
| WHERE predicate = "{c['prop']}"''') | ||
| WHERE subject_id IN (SELECT entity_id FROM `temp_entities`); | ||
| """) |
There was a problem hiding this comment.
Escape config.location_props[0] and config.date_prop to prevent SQL syntax errors or SQL injection inside the Spanner query. Also, use the safe index-based table and column names for constraints.
-- Extract locations (using the first location property)
CREATE OR REPLACE TEMPORARY TABLE `temp_locations` AS
SELECT subject_id AS entity_id, object_id AS location_id
FROM EXTERNAL_QUERY("{connection_id}",
'''SELECT subject_id, object_id FROM Edge
WHERE predicate = "{_escape_sql_literal(config.location_props[0])}"''')
WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
""")
if config.date_prop:
sql_parts.append(f"""
-- Extract dates
CREATE OR REPLACE TEMPORARY TABLE `temp_dates` AS
SELECT subject_id AS entity_id, object_id AS raw_date
FROM EXTERNAL_QUERY("{connection_id}",
'''SELECT subject_id, object_id FROM Edge
WHERE predicate = "{_escape_sql_literal(config.date_prop)}"''')
WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
""")
# Extract constraint values
for i, c in enumerate(parsed_constraints):
sql_parts.append(f"""
-- Extract constraint: {c['prop']}
CREATE OR REPLACE TEMPORARY TABLE `temp_constraint_{i}` AS
SELECT subject_id AS entity_id, object_id AS {c['col_name']}
FROM EXTERNAL_QUERY("{connection_id}",
'''SELECT subject_id, object_id FROM Edge
WHERE predicate = "{_escape_sql_literal(c['prop'])}"''')
WHERE subject_id IN (SELECT entity_id FROM `temp_entities`);
""")| # Constraint selects and joins | ||
| cte_cons_selects = ", ".join([f"c_{c['prop']}.{c['prop']}_val" for c in parsed_constraints]) | ||
| if cte_cons_selects: | ||
| cte_cons_selects = ", " + cte_cons_selects | ||
|
|
||
| outer_cons_selects = ", ".join([f"{c['prop']}_val" for c in parsed_constraints]) | ||
| if outer_cons_selects: | ||
| outer_cons_selects = ", " + outer_cons_selects | ||
|
|
||
| cons_joins = "\n".join([f"JOIN `temp_constraint_{c['prop']}` c_{c['prop']} ON e.entity_id = c_{c['prop']}.entity_id" for c in parsed_constraints]) | ||
|
|
||
| # Constraint filters (only for non-wildcard constraints) | ||
| filters = [c['sql_filter'] for c in parsed_constraints if c['sql_filter']] | ||
| filter_clause = "AND " + " AND ".join(filters) if filters else "" |
There was a problem hiding this comment.
Update the constraint selects, joins, and filters to use the safe index-based column and table names.
| # Constraint selects and joins | |
| cte_cons_selects = ", ".join([f"c_{c['prop']}.{c['prop']}_val" for c in parsed_constraints]) | |
| if cte_cons_selects: | |
| cte_cons_selects = ", " + cte_cons_selects | |
| outer_cons_selects = ", ".join([f"{c['prop']}_val" for c in parsed_constraints]) | |
| if outer_cons_selects: | |
| outer_cons_selects = ", " + outer_cons_selects | |
| cons_joins = "\n".join([f"JOIN `temp_constraint_{c['prop']}` c_{c['prop']} ON e.entity_id = c_{c['prop']}.entity_id" for c in parsed_constraints]) | |
| # Constraint filters (only for non-wildcard constraints) | |
| filters = [c['sql_filter'] for c in parsed_constraints if c['sql_filter']] | |
| filter_clause = "AND " + " AND ".join(filters) if filters else "" | |
| # Constraint selects and joins | |
| cte_cons_selects = ", ".join([f"c_{i}.{c['col_name']}" for i, c in enumerate(parsed_constraints)]) | |
| if cte_cons_selects: | |
| cte_cons_selects = ", " + cte_cons_selects | |
| outer_cons_selects = ", ".join([f"{c['col_name']}" for c in parsed_constraints]) | |
| if outer_cons_selects: | |
| outer_cons_selects = ", " + outer_cons_selects | |
| cons_joins = "\n".join([f"JOIN `temp_constraint_{i}` c_{i} ON e.entity_id = c_{i}.entity_id" for i, c in enumerate(parsed_constraints)]) | |
| # Constraint filters (only for non-wildcard constraints) | |
| filters = [c['sql_filter'] for c in parsed_constraints if c['sql_filter']] | |
| filter_clause = "AND " + " AND ".join(filters) if filters else "" |
| wildcard_cols = [f"{c['prop']}_val" for c in parsed_constraints if c['is_wildcard']] | ||
| wildcard_select = ", ".join(wildcard_cols) | ||
| if wildcard_select: | ||
| wildcard_select = ", " + wildcard_select | ||
|
|
||
| group_by_cols = f"location_id, date_{fmt_clean}, period_{fmt_clean}" | ||
| if wildcard_cols: | ||
| group_by_cols += ", " + ", ".join(wildcard_cols) |
There was a problem hiding this comment.
Update the wildcard columns and group by columns to use the safe index-based column names.
| wildcard_cols = [f"{c['prop']}_val" for c in parsed_constraints if c['is_wildcard']] | |
| wildcard_select = ", ".join(wildcard_cols) | |
| if wildcard_select: | |
| wildcard_select = ", " + wildcard_select | |
| group_by_cols = f"location_id, date_{fmt_clean}, period_{fmt_clean}" | |
| if wildcard_cols: | |
| group_by_cols += ", " + ", ".join(wildcard_cols) | |
| wildcard_cols = [c['col_name'] for c in parsed_constraints if c['is_wildcard']] | |
| wildcard_select = ", ".join(wildcard_cols) | |
| if wildcard_select: | |
| wildcard_select = ", " + wildcard_select | |
| group_by_cols = f"location_id, date_{fmt_clean}, period_{fmt_clean}" | |
| if wildcard_cols: | |
| group_by_cols += ", " + ", ".join(wildcard_cols) |
| concat_args = [] | ||
| for key in sorted_keys: | ||
| if key in sv_props: | ||
| # Static property | ||
| concat_args.append(f"'{key}={sv_props[key]}'") | ||
| else: | ||
| # Wildcard property (dynamic from column) | ||
| concat_args.append(f"'{key}='") | ||
| concat_args.append(f"COALESCE({key}_val, '')") | ||
|
|
||
| # Join with comma | ||
| # To do this in SQL CONCAT, we need to interleave commas between the key=value pairs. | ||
| # e.g., CONCAT('key1=val1', ',', 'key2=', col2, ',', 'key3=val3') | ||
| sql_concat_args = [] | ||
| for i, arg in enumerate(concat_args): | ||
| if i > 0 and not (i % 2 == 1 and concat_args[i-1].startswith("'") and not concat_args[i-1].endswith("=")): | ||
| # Add comma before next pair (only if we are not appending the value to its key) | ||
| # Wait, the logic is: we want commas BETWEEN pairs. | ||
| # A pair is either a single static string `'key=val'` or a split `'key='`, `col`. | ||
| # Let's simplify: build a list of "pair expressions" in SQL, then join them with CONCAT using a separator, | ||
| # or just construct the flat list of arguments for CONCAT in Python. | ||
| pass | ||
|
|
||
| # Let's construct the flat list of arguments for CONCAT in Python: | ||
| flat_args = [] | ||
| for i, key in enumerate(sorted_keys): | ||
| if i > 0: | ||
| flat_args.append("','") # Add comma separator between pairs | ||
| if key in sv_props: | ||
| flat_args.append(f"'{key}={sv_props[key]}'") | ||
| else: | ||
| flat_args.append(f"'{key}='") | ||
| flat_args.append(f"COALESCE({key}_val, '')") | ||
|
|
||
| concat_expr = f"CONCAT({', '.join(flat_args)})" | ||
|
|
||
| # Now we can generate the SV DCID dynamically in a temp table | ||
| # We also generate the SV Name | ||
| name_parts = [f"Count of {config.entity_types[0]}"] | ||
| for c in parsed_constraints: | ||
| if not c['is_wildcard']: | ||
| name_parts.append(f"{c['prop']}={c['val_str']}") | ||
| else: | ||
| # For wildcards, we will append the dynamic value in SQL | ||
| # we can handle name generation in SQL if needed, but a simple name is fine, | ||
| # or we can just use the DCID as name if it's too complex. | ||
| # Let's construct a decent name in SQL. | ||
| pass |
There was a problem hiding this comment.
Remove the dead code blocks (sql_concat_args and name_parts loops with pass statements) and update the deterministic SV DCID generation to use the safe index-based column names.
| concat_args = [] | |
| for key in sorted_keys: | |
| if key in sv_props: | |
| # Static property | |
| concat_args.append(f"'{key}={sv_props[key]}'") | |
| else: | |
| # Wildcard property (dynamic from column) | |
| concat_args.append(f"'{key}='") | |
| concat_args.append(f"COALESCE({key}_val, '')") | |
| # Join with comma | |
| # To do this in SQL CONCAT, we need to interleave commas between the key=value pairs. | |
| # e.g., CONCAT('key1=val1', ',', 'key2=', col2, ',', 'key3=val3') | |
| sql_concat_args = [] | |
| for i, arg in enumerate(concat_args): | |
| if i > 0 and not (i % 2 == 1 and concat_args[i-1].startswith("'") and not concat_args[i-1].endswith("=")): | |
| # Add comma before next pair (only if we are not appending the value to its key) | |
| # Wait, the logic is: we want commas BETWEEN pairs. | |
| # A pair is either a single static string `'key=val'` or a split `'key='`, `col`. | |
| # Let's simplify: build a list of "pair expressions" in SQL, then join them with CONCAT using a separator, | |
| # or just construct the flat list of arguments for CONCAT in Python. | |
| pass | |
| # Let's construct the flat list of arguments for CONCAT in Python: | |
| flat_args = [] | |
| for i, key in enumerate(sorted_keys): | |
| if i > 0: | |
| flat_args.append("','") # Add comma separator between pairs | |
| if key in sv_props: | |
| flat_args.append(f"'{key}={sv_props[key]}'") | |
| else: | |
| flat_args.append(f"'{key}='") | |
| flat_args.append(f"COALESCE({key}_val, '')") | |
| concat_expr = f"CONCAT({', '.join(flat_args)})" | |
| # Now we can generate the SV DCID dynamically in a temp table | |
| # We also generate the SV Name | |
| name_parts = [f"Count of {config.entity_types[0]}"] | |
| for c in parsed_constraints: | |
| if not c['is_wildcard']: | |
| name_parts.append(f"{c['prop']}={c['val_str']}") | |
| else: | |
| # For wildcards, we will append the dynamic value in SQL | |
| # we can handle name generation in SQL if needed, but a simple name is fine, | |
| # or we can just use the DCID as name if it's too complex. | |
| # Let's construct a decent name in SQL. | |
| pass | |
| # Let's construct the flat list of arguments for CONCAT in Python: | |
| flat_args = [] | |
| for i, key in enumerate(sorted_keys): | |
| if i > 0: | |
| flat_args.append("','") # Add comma separator between pairs | |
| if key in sv_props: | |
| flat_args.append(f"'{key}={sv_props[key]}'") | |
| else: | |
| c = next(c for c in parsed_constraints if c['prop'] == key) | |
| flat_args.append(f"'{key}='") | |
| flat_args.append(f"COALESCE({c['col_name']}, '')") | |
| concat_expr = f"CONCAT({', '.join(flat_args)})" |
| # Add wildcards to name | ||
| wildcard_constraints = [c for c in parsed_constraints if c['is_wildcard']] | ||
| for c in wildcard_constraints: | ||
| name_flat_args.append(f"', {c['prop']}='") | ||
| name_flat_args.append(f"COALESCE({c['prop']}_val, 'unknown')") |
There was a problem hiding this comment.
Update the wildcard name generation to use the safe index-based column names.
| # Add wildcards to name | |
| wildcard_constraints = [c for c in parsed_constraints if c['is_wildcard']] | |
| for c in wildcard_constraints: | |
| name_flat_args.append(f"', {c['prop']}='") | |
| name_flat_args.append(f"COALESCE({c['prop']}_val, 'unknown')") | |
| # Add wildcards to name | |
| wildcard_constraints = [c for c in parsed_constraints if c['is_wildcard']] | |
| for c in wildcard_constraints: | |
| name_flat_args.append(f"', {c['prop']}='") | |
| name_flat_args.append(f"COALESCE({c['col_name']}, 'unknown')") |
| # Wildcard props | ||
| for c in wildcard_constraints: | ||
| edge_selects.append(f"SELECT DISTINCT sv_dcid AS subject_id, '{c['prop']}' AS predicate, {c['prop']}_val AS object_id, '{output_provenance}' AS provenance FROM `temp_aggregated_with_sv` WHERE {c['prop']}_val IS NOT NULL") |
There was a problem hiding this comment.
Update the wildcard edge export query to use the safe index-based column names and escape the predicate.
| # Wildcard props | |
| for c in wildcard_constraints: | |
| edge_selects.append(f"SELECT DISTINCT sv_dcid AS subject_id, '{c['prop']}' AS predicate, {c['prop']}_val AS object_id, '{output_provenance}' AS provenance FROM `temp_aggregated_with_sv` WHERE {c['prop']}_val IS NOT NULL") | |
| # Wildcard props | |
| for c in wildcard_constraints: | |
| edge_selects.append(f"SELECT DISTINCT sv_dcid AS subject_id, '{_escape_sql_literal(c['prop'])}' AS predicate, {c['col_name']} AS object_id, '{output_provenance}' AS provenance FROM `temp_aggregated_with_sv` WHERE {c['col_name']} IS NOT NULL") |
| def _generate_sql(self, config: EntityAggregationConfig) -> str: | ||
| """Generates the multi-statement SQL script for a single aggregation config.""" | ||
| connection_id = self.executor.connection_id | ||
| dest = self.executor.get_spanner_destination_uri() | ||
|
|
||
| prefix = "dc/base/" if self.is_base_dc else "" | ||
| output_provenance = f"{prefix}{config.output_import}" | ||
|
|
||
| safe_input_imports = [_escape_sql_literal(name) for name in config.input_imports] | ||
| input_provenances = [f"'{prefix}{name}'" for name in safe_input_imports] | ||
| input_provenances_str = ", ".join(input_provenances) | ||
|
|
||
| entity_types_str = ", ".join([f"'{_escape_sql_literal(t)}'" for t in config.entity_types]) | ||
|
|
||
| # Parse constraints | ||
| parsed_constraints = self._parse_constraints(config.constraints) |
There was a problem hiding this comment.
If any of the configuration lists (entity_types, location_props, input_imports, or agg_date_formats) are empty, the generated SQL query will contain syntax errors (such as IN () or trailing commas). We should add defensive validation at the beginning of _generate_sql to raise a ValueError if these lists are empty.
def _generate_sql(self, config: EntityAggregationConfig) -> str:
"""Generates the multi-statement SQL script for a single aggregation config."""
if not config.entity_types:
raise ValueError("entity_types cannot be empty")
if not config.location_props:
raise ValueError("location_props cannot be empty")
if not config.input_imports:
raise ValueError("input_imports cannot be empty")
if not config.agg_date_formats:
raise ValueError("agg_date_formats cannot be empty")
connection_id = self.executor.connection_id
dest = self.executor.get_spanner_destination_uri()
prefix = "dc/base/" if self.is_base_dc else ""
output_provenance = f"{prefix}{config.output_import}"
safe_input_imports = [_escape_sql_literal(name) for name in config.input_imports]
input_provenances = [f"'{prefix}{name}'" for name in safe_input_imports]
input_provenances_str = ", ".join(input_provenances)
entity_types_str = ", ".join([f"'{_escape_sql_literal(t)}'" for t in config.entity_types])
# Parse constraints
parsed_constraints = self._parse_constraints(config.constraints)
Work in Progress. Do not review.