From f9bea525cce8d7ec9d37ecfc85cc22ce6b3709f1 Mon Sep 17 00:00:00 2001 From: Mathieu Guillame-Bert Date: Fri, 12 Jun 2026 10:19:56 -0700 Subject: [PATCH] Implement Spanner Graph Sampler using CTEs PiperOrigin-RevId: 931213293 --- .../io/gcp/spanner_graph_integration_test.py | 3 +- dgf/src/sampling/gcp/BUILD | 6 +- dgf/src/sampling/gcp/spanner_graph_sampler.py | 617 +++++++++------ .../spanner_graph_sampler_integration_test.py | 327 ++++++-- .../gcp/spanner_graph_sampler_test.py | 233 +++--- ..._graph_sampler_test_generate_cte_query.txt | 251 ++++++ ...mpler_test_generate_cte_query_directed.txt | 107 +++ ..._graph_sampler_test_generate_gql_query.txt | 56 -- ...ler_test_json_to_in_memory_graphs_cte.json | 740 ++++++++++++++++++ 9 files changed, 1853 insertions(+), 487 deletions(-) create mode 100644 test_data/spanner_graph_sampler_test_generate_cte_query.txt create mode 100644 test_data/spanner_graph_sampler_test_generate_cte_query_directed.txt delete mode 100644 test_data/spanner_graph_sampler_test_generate_gql_query.txt create mode 100644 test_data/spanner_graph_sampler_test_json_to_in_memory_graphs_cte.json diff --git a/dgf/src/io/gcp/spanner_graph_integration_test.py b/dgf/src/io/gcp/spanner_graph_integration_test.py index 2dd9370..41f51a6 100644 --- a/dgf/src/io/gcp/spanner_graph_integration_test.py +++ b/dgf/src/io/gcp/spanner_graph_integration_test.py @@ -30,10 +30,11 @@ def expected_schema() -> schema_lib.GraphSchema: "id": schema_lib.FeatureSchema( format=schema_lib.FeatureFormat.BYTES, semantic=schema_lib.FeatureSemantic.PRIMARY_ID, + is_utf8_string=True, ), "feat": schema_lib.FeatureSchema( format=schema_lib.FeatureFormat.FLOAT_64, - shape=(128,), + shape=(None,), ), "labels": schema_lib.FeatureSchema( format=schema_lib.FeatureFormat.INTEGER_64, diff --git a/dgf/src/sampling/gcp/BUILD b/dgf/src/sampling/gcp/BUILD index 53cc3fd..460d867 100644 --- a/dgf/src/sampling/gcp/BUILD +++ b/dgf/src/sampling/gcp/BUILD @@ -20,10 +20,14 @@ py_binary( ":spanner_graph_sampler", # absl/testing:absltest dep, # absl/testing:parameterized dep, + "//dgf/src/analyse:schema", "//dgf/src/data:in_memory_graph", "//dgf/src/data:schema", - "//dgf/src/plot:network", + "//dgf/src/io:cache", + "//dgf/src/io/gcp:spanner_graph", "//dgf/src/sampling:config", + "//dgf/src/sampling:in_memory_sampler", + "//dgf/src/util:test_util", "//dgf/src/validate:in_memory_graph", # numpy dep, ], diff --git a/dgf/src/sampling/gcp/spanner_graph_sampler.py b/dgf/src/sampling/gcp/spanner_graph_sampler.py index 78ea609..111f3a4 100644 --- a/dgf/src/sampling/gcp/spanner_graph_sampler.py +++ b/dgf/src/sampling/gcp/spanner_graph_sampler.py @@ -14,8 +14,10 @@ """Graph sampling using Google Spanner Graph's GQL engine.""" +import base64 +import collections import dataclasses -import logging +import json from typing import Any, Dict, List, Set, Tuple, Union from dgf.src.analyse import schema as analyse_schema_lib from dgf.src.data import in_memory_graph as in_memory_graph_lib @@ -25,8 +27,52 @@ from google.cloud import spanner import numpy as np -# The possible types of node ids. -NodeId = Union[str, int] +# Node IDs are always bytes in DGF. +NormalizedNodeId = bytes + + +def _normalize_node_id( + node_id: Union[str, bytes], + feature_schema: schema_lib.FeatureSchema, + is_from_spanner: bool = False, +) -> bytes: + if is_from_spanner and isinstance(node_id, bytes): + try: + node_id = base64.b64decode(node_id) + except Exception: + pass + + if isinstance(node_id, str): + node_id = node_id.encode("utf-8") + + if feature_schema.format != schema_lib.FeatureFormat.BYTES: + raise ValueError( + f"Unsupported key format: {feature_schema.format}. Only BYTES (string)" + " is supported." + ) + + return bytes(node_id) + + +def _node_id_expression( + var: str, pk: str, nodeset: str, schema: schema_lib.GraphSchema +) -> str: + feat_schema = schema.node_sets[nodeset].features[pk] + if feat_schema.is_utf8_string: + return f"CAST({var}.{pk} AS BYTES)" + else: + return f"{var}.{pk}" + + +def _sql_cast_to_bytes( + col_name: str, nodeset_name: str, schema: schema_lib.GraphSchema +) -> str: + pk = _get_primary_key(nodeset_name, schema) + feat_schema = schema.node_sets[nodeset_name].features[pk] + if feat_schema.is_utf8_string: + return f"CAST({col_name} AS BYTES)" + return col_name + RawFeatures = Dict[str, list] @@ -48,14 +94,11 @@ class RawGraph: node_sets: Dict[str, RawNodeset] edge_sets: Dict[str, RawEdgeset] - # Mapping from the Spanner graph internal node ids (which is different from - # the classical id nodes) to the per-nodeset dense node index in the raw - # graph. "already_visited" is used to avoid double counting nodes, and to - # create the adjacency lists. - spanner_node_ids: Dict[str, int] = dataclasses.field(default_factory=dict) + # Mapping from the Spanner graph internal node ids to the per-nodeset dense + # node index in the raw graph. + spanner_node_ids: Dict[bytes, int] = dataclasses.field(default_factory=dict) - # Set of Spanner graph internal edge ids (or constructed keys) to avoid - # double counting edges. + # Set of Spanner graph internal edge ids to avoid double counting edges. spanner_edge_ids: Set[Any] = dataclasses.field(default_factory=set) @@ -74,6 +117,167 @@ class SpannerGraphConnection: database: Any +def _get_primary_key(nodeset_name: str, schema: schema_lib.GraphSchema) -> str: + return analyse_schema_lib.primary_feature( + nodeset_name, schema.node_sets[nodeset_name] + ) + + +class CteQueryGenerator: + """Generates GQL/SQL queries that return both graph structure and features in a single call.""" + + def __init__( + self, + graph_name: str, + schema: schema_lib.GraphSchema, + plan: config_lib.SamplingPlan, + debug_sampling: bool, + ): + self.graph_name = graph_name + self.schema = schema + self.plan = plan + self.debug_sampling = debug_sampling + self.cte_defs = [] + self.union_queries = [] + self.var_count = 0 + self.order_by_str = "" if debug_sampling else " ORDER BY GENERATE_UUID()" + + def generate(self) -> str: + root_nodeset = self.plan.root.nodeset + root_pk = _get_primary_key(root_nodeset, self.schema) + + # Native ID in source_node CTE (no cast) + source_node_cte = f"""source_node AS ( + SELECT n0_id AS seed_id, n0_id AS n0_id + FROM GRAPH_TABLE({self.graph_name} + MATCH (n0:{root_nodeset} WHERE n0.{root_pk} IN UNNEST(@seed_ids)) + RETURN n0.{root_pk} AS n0_id + ) +)""" + self.cte_defs.append(source_node_cte) + + # Union seed nodes (cast to BYTES for the union, join to get features) + seed_id_cast = _sql_cast_to_bytes("n0_id", root_nodeset, self.schema) + self.union_queries.append( + f"SELECT {seed_id_cast} AS seed_id, {seed_id_cast} AS node_id, 'node'" + f" AS element_type, '{root_nodeset}' AS element_class, CAST(NULL AS" + " BYTES) AS source_id, CAST(NULL AS BYTES) AS target_id, TO_JSON(n) AS" + f" properties_json FROM source_node AS h JOIN {root_nodeset} AS n ON" + f" h.n0_id = n.{root_pk}" + ) + + self._visit_node(self.plan.root, "source_node", "n0_id") + + cte_part = "WITH\n " + ",\n ".join(self.cte_defs) + union_part = "\nUNION ALL\n".join(self.union_queries) + return ( + f"{cte_part}\nSELECT seed_id, node_id, element_type, element_class," + f" source_id, target_id, properties_json FROM (\n{union_part}\n)" + ) + + def _visit_node( + self, + node: config_lib.PlanNode, + parent_cte: str, + parent_id_col: str, + ): + for edge in node.children: + self.var_count += 1 + child_var = f"n{self.var_count}" + cte_name = f"hop_{self.var_count}" + + child_nodeset = edge.node.nodeset + parent_nodeset = node.nodeset + + parent_pk = _get_primary_key(parent_nodeset, self.schema) + child_pk = _get_primary_key(child_nodeset, self.schema) + + limit = edge.hop_width + gql_parent_var = "gp" + gql_child_var = "gc" + gql_edge_var = "ge" + + # Clean arrow syntax + if edge.reversed: + left_arrow = "<-" + right_arrow = "-" + else: + left_arrow = "-" + right_arrow = "->" + + match_pattern = f"({gql_parent_var}:{parent_nodeset}){left_arrow}[{gql_edge_var}:{edge.edgeset}]{right_arrow}({gql_child_var}:{child_nodeset})" + + # Native ID expressions (no cast) + parent_id_expr_native = f"{gql_parent_var}.{parent_pk}" + child_id_expr_native = f"{gql_child_var}.{child_pk}" + + # Localized IS_FIRST partition by gp.id (parent native ID) + if self.order_by_str: + over_clause = ( + "OVER (PARTITION BY" + f" {gql_parent_var}.{parent_pk}{self.order_by_str})" + ) + else: + over_clause = f"OVER (PARTITION BY {gql_parent_var}.{parent_pk})" + + gql_query = f"""GRAPH_TABLE({self.graph_name} + MATCH {match_pattern} + FILTER IS_FIRST({limit}) {over_clause} + RETURN {parent_id_expr_native} AS parent_id, TO_JSON({gql_edge_var}) AS edge_json, {child_id_expr_native} AS child_id + )""" + + # Propagate native IDs in CTE, join on native IDs + cte_def = f"""{cte_name} AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS {child_var}_id, p.{parent_id_col} AS parent_id + FROM {parent_cte} p + JOIN {gql_query} AS gt ON p.{parent_id_col} = gt.parent_id +)""" + self.cte_defs.append(cte_def) + + # Cast to BYTES for the final UNION ALL + root_nodeset = self.plan.root.nodeset + seed_id_cast = _sql_cast_to_bytes("seed_id", root_nodeset, self.schema) + parent_id_cast = _sql_cast_to_bytes( + "parent_id", parent_nodeset, self.schema + ) + child_id_cast = _sql_cast_to_bytes( + f"{child_var}_id", child_nodeset, self.schema + ) + + # Union edges + if edge.reversed: + self.union_queries.append( + f"SELECT {seed_id_cast} AS seed_id, CAST(NULL AS BYTES) AS node_id," + f" 'edge' AS element_type, '{edge.edgeset}' AS element_class," + f" {child_id_cast} AS source_id, {parent_id_cast} AS target_id," + f" edge_json AS properties_json FROM {cte_name}" + ) + else: + self.union_queries.append( + f"SELECT {seed_id_cast} AS seed_id, CAST(NULL AS BYTES) AS node_id," + f" 'edge' AS element_type, '{edge.edgeset}' AS element_class," + f" {parent_id_cast} AS source_id, {child_id_cast} AS target_id," + f" edge_json AS properties_json FROM {cte_name}" + ) + + # Union nodes (with features, JOIN child table) + node_union_query = f""" + SELECT + {seed_id_cast} AS seed_id, + {child_id_cast} AS node_id, + 'node' AS element_type, + '{child_nodeset}' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM {cte_name} AS h + JOIN {child_nodeset} AS n ON h.{child_var}_id = n.{child_pk} + """ + self.union_queries.append(node_union_query) + + self._visit_node(edge.node, cte_name, f"{child_var}_id") + + class SpannerGraphSampler: """Sampler that executes queries on Spanner directly to fetch subgraphs.""" @@ -100,27 +304,64 @@ def _get_connection(self) -> SpannerGraphConnection: ) return self._connection - def _get_gql_query(self, seed_ids: List[NodeId]) -> str: - return _generate_gql_query( + def _get_query(self) -> str: + generator = CteQueryGenerator( graph_name=self._graph.graph, - plan=self._plan, schema=self._schema, - seed_ids=seed_ids, + plan=self._plan, debug_sampling=self._debug_sampling, ) + return generator.generate() + + def sample( + self, seed_ids: List[bytes] + ) -> List[in_memory_graph_lib.InMemoryGraph]: + """Samples subgraphs starting from the given seed nodes. + + Args: + seed_ids: The list of node IDs (as bytes) to use as seeds. - def sample_to_json(self, seed_ids: List[NodeId]): - query = self._get_gql_query(seed_ids) + Returns: + A list of InMemoryGraph objects corresponding to the subgraphs sampled for + each seed ID. + """ connection = self._get_connection() + query = self._get_query() + + # Prepare parameters for secure query execution. + root_nodeset = self._plan.root.nodeset + root_pk = _get_primary_key(root_nodeset, self._schema) + root_pk_schema = self._schema.node_sets[root_nodeset].features[root_pk] + normalized_seeds = [ + _normalize_node_id(sid, root_pk_schema) for sid in seed_ids + ] + + if root_pk_schema.is_utf8_string: + # Decode bytes to string for Spanner STRING type. + params = {"seed_ids": [sid.decode("utf-8") for sid in normalized_seeds]} + param_types = { + "seed_ids": spanner.param_types.Array(spanner.param_types.STRING) + } + else: + params = {"seed_ids": normalized_seeds} + param_types = { + "seed_ids": spanner.param_types.Array(spanner.param_types.BYTES) + } + with connection.database.snapshot() as snapshot: - return list(snapshot.execute_sql(query)) + json_results = list( + snapshot.execute_sql( + query, + params=params, + param_types=param_types, + ) + ) - def sample( - self, seed_ids: List[NodeId] - ) -> List[in_memory_graph_lib.InMemoryGraph]: - json_results = self.sample_to_json(seed_ids) - return _json_to_in_memory_graphs( - json_results, self._schema, seed_ids, self._plan.root.nodeset + return _cte_result_to_in_memory_graphs( + json_results, + self._schema, + seed_ids, + self._plan.root.nodeset, ) @@ -133,39 +374,7 @@ def create_graph_spanner_sampler( schema: schema_lib.GraphSchema, debug_sampling: bool = False, ) -> SpannerGraphSampler: - """Creates a SpannerGraphSampler instance. - - Example: - ```python - schema = schema_lib.GraphSchema(...) - config = config_lib.SimpleSamplingConfig( - seed_nodeset='users', - num_hops=2, - hop_width=10 - ) - sampler = create_graph_spanner_sampler( - project='my-project', - instance='my-instance', - database='my-database', - graph='my-graph', - plan=config, - schema=schema - ) - subgraphs = sampler.sample(['user1', 'user2']) - ``` - - Args: - project: Google Cloud project ID. - instance: Spanner instance ID. - database: Spanner database ID. - graph: Spanner graph ID. - plan: Sampling plan or simple sampling config. - schema: Graph schema. - debug_sampling: Whether to enable debug mode for sampling. - - Returns: - A SpannerGraphSampler instance. - """ + """Creates a SpannerGraphSampler instance.""" if isinstance(plan, config_lib.SimpleSamplingConfig): plan = config_lib.simple_sampling_config_to_sampling_plan(plan, schema) @@ -182,122 +391,6 @@ def create_graph_spanner_sampler( ) -def _generate_gql_query( - graph_name: str, - plan: config_lib.SamplingPlan, - schema: schema_lib.GraphSchema, - seed_ids: List[NodeId], - debug_sampling: bool, -) -> str: - """Generates a GQL query string for sampling from Spanner Graphs. - - The GQL query will return a different row for each sampling path i.e. leaf - nodes. - - TODO(gbm): Can we avoid retuning multiple times the feature values (i.e. - properties) of the non-leaf nodes? - - Args: - graph_name: Spanner graph ID. - plan: Sampling plan. - schema: Graph schema. - seed_ids: List of seed node IDs. - debug_sampling: Whether to enable debug mode for sampling. - - Returns: - A GQL query string. - """ - - primary_key_feature = analyse_schema_lib.primary_feature( - plan.root.nodeset, schema.node_sets[plan.root.nodeset] - ) - - seed_ids_str = ", ".join([f"'{seed_id}'" for seed_id in seed_ids]) - query_parts = [f"GRAPH {graph_name}"] - - def count_edges(node): - return len(node.children) + sum(count_edges(c.node) for c in node.children) - - total_steps = count_edges(plan.root) - - if len(seed_ids) == 1: - root_match = f"MATCH (n0 {{{primary_key_feature}: '{seed_ids[0]}'}})" - else: - root_match = ( - f"MATCH (n0 WHERE n0.{primary_key_feature} IN ({seed_ids_str}))" - ) - query_parts.append(root_match) - - if total_steps == 0: - query_parts.append("RETURN DISTINCT TO_JSON(n0) AS n0") - return "\n".join(query_parts) - - paths = [] - var_count = 0 - all_vars = ["n0"] - - def build_paths( - node: config_lib.PlanNode, parent_var: str, current_path: str - ): - nonlocal var_count - if not node.children: - if current_path: - paths.append(current_path) - return - - for i, edge in enumerate(node.children): - var_count += 1 - child_var = f"n{var_count}" - edge_var = f"e{var_count}" - all_vars.extend([child_var, edge_var]) - - order_by = "" if debug_sampling else " ORDER BY GENERATE_UUID()" - if edge.reversed: - edge_pattern = ( - f"<-[{edge_var}:{edge.edgeset} WHERE {edge_var} IN {{\n MATCH" - f" <-[selected_e:{edge.edgeset}]-()\n FILTER" - f" IS_FIRST({edge.hop_width}) OVER (PARTITION BY" - f" DESTINATION_NODE_ID(selected_e){order_by})\n RETURN" - f" selected_e\n }}]-({child_var})" - ) - else: - edge_pattern = ( - f"-[{edge_var}:{edge.edgeset} WHERE {edge_var} IN {{\n MATCH" - f" -[selected_e:{edge.edgeset}]->()\n FILTER" - f" IS_FIRST({edge.hop_width}) OVER (PARTITION BY" - f" SOURCE_NODE_ID(selected_e){order_by})\n RETURN selected_e\n " - f" }}]->({child_var})" - ) - - if i == 0: - next_path = ( - f"{current_path}{edge_pattern}" - if current_path == "(n0)" - else f"{current_path} {edge_pattern}" - if current_path - else f"({parent_var}) {edge_pattern}" - ) - else: - next_path = f"({parent_var}) {edge_pattern}" - - build_paths(edge.node, child_var, next_path) - - build_paths(plan.root, "n0", "(n0)") - - query_parts.append("OPTIONAL MATCH " + "\n\nOPTIONAL MATCH ".join(paths)) - - nodes_to_ret = [v for v in all_vars if v.startswith("n")] - edges_to_ret = [v for v in all_vars if v.startswith("e")] - ret_str = ", ".join( - [f"TO_JSON({v}) AS {v}" for v in nodes_to_ret + edges_to_ret] - ) - query_parts.append(f"RETURN DISTINCT {ret_str}") - - return "\n".join(query_parts) - - -# TODO(gbm): Use "_graph_element_to_features" in -# "src/io/gcp/common.py" def _json_features_to_features( values: Dict[str, List[Any]], schema: schema_lib.FeatureSetSchema ) -> Dict[str, np.ndarray]: @@ -307,41 +400,36 @@ def _json_features_to_features( dtype = feature_format.FEATURE_FORMAT_TO_NP_DTYPE[feature_schema.format] value = np.array(values[feature_name], dtype=dtype) if feature_schema.shape: - shape = [-1] + list(feature_schema.shape) - value = value.reshape(shape) + resolved_shape = [0 if value.size == 0 else -1] + for i, dim in enumerate(feature_schema.shape): + if dim is None: + if i + 1 < value.ndim: + resolved_shape.append(value.shape[i + 1]) + else: + resolved_shape.append(0) + else: + resolved_shape.append(dim) + value = value.reshape(resolved_shape) result[feature_name] = value return result -# TODO(gbm): This method converts a list of paths into a graph. This is slow. -# Ultimately, we want for the GQL query to return directly a graph (e.g., list -# of nodes and list of edges). When done, we will be able to use the -# methods in "dgf/src/io/gcp/common.py". -def _json_to_in_memory_graphs( +def _cte_result_to_in_memory_graphs( result: Any, schema: schema_lib.GraphSchema, - seed_ids: List[NodeId], + seed_ids: List[bytes], root_nodeset: str, ) -> List[in_memory_graph_lib.InMemoryGraph]: - """Converts the spanner graph JSON results into in memory graphs. - - Args: - result: The raw JSON result from the Spanner GQL query. - schema: The graph schema. - seed_ids: List of seed node IDs. - root_nodeset: Seeded nodeset. - - Returns: - A list of InMemoryGraph objects. - """ - - primary_key_feature = analyse_schema_lib.primary_feature( - root_nodeset, schema.node_sets[root_nodeset] - ) - raw_graphs_per_seed: Dict[NodeId, RawGraph] = {} - - # Initialises the accumulators - for seed_id in seed_ids: + """Converts the flat CTE query results (with features) into InMemoryGraphs.""" + root_pk = _get_primary_key(root_nodeset, schema) + root_pk_schema = schema.node_sets[root_nodeset].features[root_pk] + normalized_seeds = [ + _normalize_node_id(sid, root_pk_schema) for sid in seed_ids + ] + + # 1. Initialize RawGraphs for each seed + raw_graphs_per_seed: Dict[bytes, RawGraph] = {} + for seed_id in normalized_seeds: raw_graphs_per_seed[seed_id] = RawGraph( node_sets={ nodeset_name: RawNodeset( @@ -362,59 +450,110 @@ def _json_to_in_memory_graphs( }, ) - # For each sampling path. + # Intermediate structure to store edges before we can resolve node indices. + pending_edges = collections.defaultdict(list) + + # First Pass: Collect all nodes and their features, and store pending edges. for row in result: - seed = row[0] - if not seed: + seed_id_bytes = row[0] + if seed_id_bytes is None: + continue + seed_id = _normalize_node_id( + seed_id_bytes, root_pk_schema, is_from_spanner=True + ) + if seed_id not in raw_graphs_per_seed: continue - - seed_id = seed["properties"][primary_key_feature] raw_graph = raw_graphs_per_seed[seed_id] - for n in range(len(row)): - item = row[n] - if not item: - continue - - if item["kind"] == "node": - # For each node in the sampling path. - nodeset_name = item["labels"][0] - node_id = item["identifier"] - nodeset = raw_graph.node_sets[nodeset_name] - raw_features = item["properties"] - - if node_id not in raw_graph.spanner_node_ids: - raw_graph.spanner_node_ids[node_id] = nodeset.num_nodes - nodeset.num_nodes += 1 - for feature_name in nodeset.features: - nodeset.features[feature_name].append(raw_features[feature_name]) + optype = row[2] # element_type + element_class = row[3] # nodeset or edgeset name + properties_json = row[6] # properties_json (JSON) + + if isinstance(properties_json, str): + properties = json.loads(properties_json) if properties_json else {} + elif isinstance(properties_json, dict): + properties = properties_json + else: + properties = {} + + if optype == "node": + node_id_bytes = row[1] + nodeset_name = element_class + node_pk = _get_primary_key(nodeset_name, schema) + node_pk_schema = schema.node_sets[nodeset_name].features[node_pk] + node_id = _normalize_node_id( + node_id_bytes, node_pk_schema, is_from_spanner=True + ) - if item["kind"] == "edge": - # For each edge in the sampling path. - edgeset_name = item["labels"][0] - spanner_source_id = item["source_node_identifier"] - spanner_target_id = item["destination_node_identifier"] + nodeset = raw_graph.node_sets[nodeset_name] + + if node_id not in raw_graph.spanner_node_ids: + raw_graph.spanner_node_ids[node_id] = nodeset.num_nodes + nodeset.num_nodes += 1 + for feature_name in nodeset.features: + if feature_name == node_pk: + val = node_id + else: + # Extract feature from properties. + val = properties.get(feature_name) + nodeset.features[feature_name].append(val) + + elif optype == "edge": + edgeset_name = element_class + source_id_bytes = row[4] + target_id_bytes = row[5] + + edgeset_schema = schema.edge_sets[edgeset_name] + src_pk = _get_primary_key(edgeset_schema.source, schema) + trg_pk = _get_primary_key(edgeset_schema.target, schema) + src_pk_schema = schema.node_sets[edgeset_schema.source].features[src_pk] + trg_pk_schema = schema.node_sets[edgeset_schema.target].features[trg_pk] + + source_id = _normalize_node_id( + source_id_bytes, src_pk_schema, is_from_spanner=True + ) + target_id = _normalize_node_id( + target_id_bytes, trg_pk_schema, is_from_spanner=True + ) - edge_id = item["identifier"] + pending_edges[seed_id].append( + (edgeset_name, source_id, target_id, properties) + ) - if edge_id not in raw_graph.spanner_edge_ids: - raw_graph.spanner_edge_ids.add(edge_id) + # Second Pass: Process edges and build final InMemoryGraphs + in_memory_graphs = [] + for seed_id in normalized_seeds: + raw_graph = raw_graphs_per_seed[seed_id] - edgeset = raw_graph.edge_sets[edgeset_name] - raw_features = item["properties"] + # 1. Process pending edges for this seed + for edgeset_name, source_id, target_id, properties in pending_edges[ + seed_id + ]: + edgeset_schema = schema.edge_sets[edgeset_name] + + # Ensure source and target nodes exist in the graph. + if ( + source_id not in raw_graph.spanner_node_ids + or target_id not in raw_graph.spanner_node_ids + ): + continue - source_idx = raw_graph.spanner_node_ids[spanner_source_id] - target_idx = raw_graph.spanner_node_ids[spanner_target_id] - edgeset.adjacency.append((source_idx, target_idx)) + source_idx = raw_graph.spanner_node_ids[source_id] + target_idx = raw_graph.spanner_node_ids[target_id] - for feature_name in edgeset.features: - edgeset.features[feature_name].append(raw_features[feature_name]) + edgeset = raw_graph.edge_sets[edgeset_name] - in_memory_graphs = [] + # We need a unique edge ID to avoid duplicates + edge_id = properties.get("identifier", f"{source_id}-{target_id}") + if edge_id not in raw_graph.spanner_edge_ids: + raw_graph.spanner_edge_ids.add(edge_id) + edgeset.adjacency.append((source_idx, target_idx)) - for seed_id in seed_ids: - raw_graph = raw_graphs_per_seed[seed_id] + for feature_name in edgeset.features: + val = properties.get(feature_name) + edgeset.features[feature_name].append(val) + # 2. Reconstruct NodeSets and EdgeSets node_sets = {} for nodeset_name, nodeset_schema in schema.node_sets.items(): raw_nodeset = raw_graph.node_sets[nodeset_name] diff --git a/dgf/src/sampling/gcp/spanner_graph_sampler_integration_test.py b/dgf/src/sampling/gcp/spanner_graph_sampler_integration_test.py index ce92371..42e11a3 100644 --- a/dgf/src/sampling/gcp/spanner_graph_sampler_integration_test.py +++ b/dgf/src/sampling/gcp/spanner_graph_sampler_integration_test.py @@ -16,13 +16,18 @@ """ import time +from typing import List from absl.testing import absltest from absl.testing import parameterized +from dgf.src.analyse import schema as analyse_schema_lib from dgf.src.data import in_memory_graph as in_memory_graph_lib from dgf.src.data import schema as schema_lib -from dgf.src.plot import network as network_lib +from dgf.src.io import cache as cache_lib +from dgf.src.io.gcp import spanner_graph as spanner_graph_io_lib from dgf.src.sampling import config as config_lib +from dgf.src.sampling import in_memory_sampler as in_memory_sampler_lib from dgf.src.sampling.gcp import spanner_graph_sampler as spanner_graph_sampler_lib +from dgf.src.util import test_util from dgf.src.validate import in_memory_graph as in_memory_graph_validate_lib import numpy as np @@ -30,90 +35,276 @@ InMemoryNodeSet = in_memory_graph_lib.InMemoryNodeSet InMemoryEdgeSet = in_memory_graph_lib.InMemoryEdgeSet +DEBUG_SAMPLING = True # If true, usses deterministic sampling. -def _toy_schema(has_feat: bool = False): - features = { - "id": schema_lib.FeatureSchema( - format=schema_lib.FeatureFormat.BYTES, - semantic=schema_lib.FeatureSemantic.PRIMARY_ID, - ), - "labels": schema_lib.FeatureSchema( - format=schema_lib.FeatureFormat.INTEGER_64, - semantic=schema_lib.FeatureSemantic.CATEGORICAL, - ), - "year": schema_lib.FeatureSchema( - format=schema_lib.FeatureFormat.INTEGER_64, - semantic=schema_lib.FeatureSemantic.NUMERICAL, - ), - } - if has_feat: - features["feat"] = schema_lib.FeatureSchema( - format=schema_lib.FeatureFormat.FLOAT_32, - semantic=schema_lib.FeatureSemantic.EMBEDDING, - shape=(128,), - ) - return schema_lib.GraphSchema( - node_sets={"nodes": schema_lib.NodeSchema(features=features)}, - edge_sets={ - "edges": schema_lib.EdgeSchema(source="nodes", target="nodes") - }, + +def normalize_graph( + graph: in_memory_graph_lib.InMemoryGraph, schema: schema_lib.GraphSchema +) -> in_memory_graph_lib.InMemoryGraph: + node_mappings = {} + new_node_sets = {} + + for nodeset_name, nodeset in graph.node_sets.items(): + primary_key_feature = analyse_schema_lib.primary_feature( + nodeset_name, schema.node_sets[nodeset_name] + ) + ids = nodeset.features[primary_key_feature] + sorted_idxs = np.argsort(ids) + + old_to_new = np.zeros_like(sorted_idxs) + for new_idx, old_idx in enumerate(sorted_idxs): + old_to_new[old_idx] = new_idx + node_mappings[nodeset_name] = old_to_new + + new_features = {} + for feat_name, feat_val in nodeset.features.items(): + new_features[feat_name] = feat_val[sorted_idxs] + + new_node_sets[nodeset_name] = in_memory_graph_lib.InMemoryNodeSet( + num_nodes=nodeset.num_nodes, + features=new_features, + ) + + new_edge_sets = {} + for edgeset_name, edgeset in graph.edge_sets.items(): + edgeset_schema = schema.edge_sets[edgeset_name] + source_nodeset = edgeset_schema.source + target_nodeset = edgeset_schema.target + + src_map = node_mappings[source_nodeset] + trg_map = node_mappings[target_nodeset] + + adj = edgeset.adjacency + new_adj = np.zeros_like(adj) + new_adj[0] = src_map[adj[0]] + new_adj[1] = trg_map[adj[1]] + + sorted_edge_idxs = np.lexsort((new_adj[1], new_adj[0])) + new_adj = new_adj[:, sorted_edge_idxs] + + new_features = {} + for feat_name, feat_val in edgeset.features.items(): + new_features[feat_name] = feat_val[sorted_edge_idxs] + + new_edge_sets[edgeset_name] = in_memory_graph_lib.InMemoryEdgeSet( + adjacency=new_adj, + features=new_features, + ) + + return in_memory_graph_lib.InMemoryGraph( + node_sets=new_node_sets, + edge_sets=new_edge_sets, ) -class E2ETest(parameterized.TestCase): +def print_graph_stats( + name: str, + samples: List[in_memory_graph_lib.InMemoryGraph], + seed_ids: List[bytes], +): + for sample, seed_id in zip(samples, seed_ids): + num_nodes = sum(nodeset.num_nodes for nodeset in sample.node_sets.values()) + num_edges = sum( + edgeset.num_edges() for edgeset in sample.edge_sets.values() + ) + seed_id_str = ( + seed_id.decode("utf-8") if isinstance(seed_id, bytes) else str(seed_id) + ) + print( + f"\t{name} sample for seed {seed_id_str}: {num_nodes} nodes," + f" {num_edges} edges" + ) + - def test_manual_sample_arxiv(self): - r"""Manual sampling unit test.""" - # Note: The Arxiv spanner graph has a slighly different schema than the - # one returned by "fetch_ogb_graph". - # - # TODO(gbm): Infer the schema from the spanner graph when available: - # "spanner_graph_in_memory.read_schema_from_spanner_graph" - schema = _toy_schema() - spanner_graph_config = { +class E2EArxivTest(parameterized.TestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.spanner_graph_config = { "project": "biggraphs-poc", "instance": "gcp-gnns", "database": "ogbn_arxiv", "graph": "ogbn_arxiv", } + print("Downloading graph from Spanner (or loading from cache)...") + start_time = time.time() + cls.graph, cls.schema = cache_lib.cache( + "/tmp/spanner_graph_sampler_integration_test_arxiv_cache_v2.pkl", + lambda: spanner_graph_io_lib.read_spanner_graph( + **cls.spanner_graph_config, verbose=2 + ), + ) + print(f"Graph loaded in {time.time() - start_time:.2f} seconds.") + + def test_benchmark_and_equivalence(self): + seed_ids = [b"10", b"11"] + plan = config_lib.SimpleSamplingConfig( seed_nodeset="nodes", + num_hops=3, + hop_width=10, + reverse=True, + ) + + # CTE Sampler + cte_sampler = spanner_graph_sampler_lib.create_graph_spanner_sampler( + schema=self.schema, + plan=plan, + **self.spanner_graph_config, + debug_sampling=DEBUG_SAMPLING, + ) + start_time = time.time() + cte_samples = cte_sampler.sample(seed_ids) + cte_time = time.time() - start_time + in_memory_graph_validate_lib.validate_graph( + cte_samples[0], self.schema, raise_on_warning=False + ) + print(f"CTE sampling took {cte_time:.4f} seconds") + print_graph_stats("CTE", cte_samples, seed_ids) + + # In-Memory Sampler + plan_tree = config_lib.simple_sampling_config_to_sampling_plan( + plan, self.schema + ) + in_memory_sampler = in_memory_sampler_lib.create_sampler( + self.graph, + plan_tree, + self.schema, + batch_size=len(seed_ids), + num_threads=0, + debug_sampling=DEBUG_SAMPLING, + ) + + # Map seed IDs to indices for in-memory sampler + node_ids = self.graph.node_sets["nodes"].features["id"] + seed_bytes = seed_ids + seed_indices = [] + for sb in seed_bytes: + idx = np.where(node_ids == sb)[0] + if len(idx) == 0: + raise ValueError(f"Seed ID {sb} not found in graph") + seed_indices.append(idx[0]) + + start_time = time.time() + in_mem_samples = in_memory_sampler.sample(seed_indices) + in_mem_time = time.time() - start_time + in_memory_graph_validate_lib.validate_graph( + in_mem_samples[0], self.schema, raise_on_warning=False + ) + print(f"In-Memory sampling took {in_mem_time:.4f} seconds\n") + print_graph_stats("In-Memory", in_mem_samples, seed_ids) + + self.assertEqual(len(cte_samples), len(seed_ids)) + self.assertEqual(len(in_mem_samples), len(seed_ids)) + + if DEBUG_SAMPLING: + for i in range(len(seed_ids)): + norm_cte = normalize_graph(cte_samples[i], self.schema) + norm_in_mem = normalize_graph(in_mem_samples[i], self.schema) + + # Compare In-Memory vs CTE + test_util.assert_are_equal(self, norm_in_mem, norm_cte) + + +class E2EMagTest(parameterized.TestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.spanner_graph_config = { + "project": "biggraphs-poc", + "instance": "gcp-gnns", + "database": "ogbn_mag", + "graph": "ogbn_mag", + } + + cls.test_in_process = False + + if cls.test_in_process: + print("Downloading graph from Spanner (or loading from cache)...") + start_time = time.time() + cls.graph, cls.schema = cache_lib.cache( + "/tmp/spanner_graph_sampler_integration_test_mag_cache_v2.pkl", + lambda: spanner_graph_io_lib.read_spanner_graph( + **cls.spanner_graph_config, verbose=2, max_workers=4 + ), + ) + print(f"Graph loaded in {time.time() - start_time:.2f} seconds.") + + def test_benchmark_and_equivalence(self): + seed_ids = [b"paper0", b"paper1"] + + plan = config_lib.SimpleSamplingConfig( + seed_nodeset="paper", num_hops=2, - hop_width=2, + hop_width=5, reverse=True, ) - spanner_sampler = spanner_graph_sampler_lib.create_graph_spanner_sampler( - schema=schema, plan=plan, **spanner_graph_config - ) - - for run_idx, seed_ids in enumerate([["10", "11"]]): - durations = [] - for _ in range(1): - start_time = time.time() - spanner_samples = spanner_sampler.sample(seed_ids) - end_time = time.time() - duration = end_time - start_time - durations.append(duration) - print(f"Sampling took {duration:.2f} seconds") - print(f"Average sampling time: {np.mean(durations):.2f} seconds") - - for seed_id, spanner_sample in zip(seed_ids, spanner_samples): - in_memory_graph_validate_lib.validate_graph( - spanner_sample, schema, raise_on_warning=False - ) - self.assertEqual( - spanner_sample.node_sets["nodes"].features["id"][0].item(), - seed_id.encode("utf-8"), - ) - - p = network_lib.plot_graph(spanner_sample, schema, features=False) - p.render( - f"/tmp/gf/graph_run{run_idx}_seed{seed_id}", - format="png", - cleanup=True, - ) + # CTE Sampler + spanner_schema = spanner_graph_io_lib.read_spanner_graph_schema( + **self.spanner_graph_config + ) + cte_sampler = spanner_graph_sampler_lib.create_graph_spanner_sampler( + schema=spanner_schema, + plan=plan, + **self.spanner_graph_config, + debug_sampling=DEBUG_SAMPLING, + ) + start_time = time.time() + cte_samples = cte_sampler.sample(seed_ids) + cte_time = time.time() - start_time + in_memory_graph_validate_lib.validate_graph( + cte_samples[0], spanner_schema, raise_on_warning=False + ) + print(f"CTE sampling took {cte_time:.4f} seconds") + print_graph_stats("CTE", cte_samples, seed_ids) + + if self.test_in_process: + # In-Memory Sampler + plan_tree = config_lib.simple_sampling_config_to_sampling_plan( + plan, self.schema + ) + in_memory_sampler = in_memory_sampler_lib.create_sampler( + self.graph, + plan_tree, + self.schema, + batch_size=len(seed_ids), + num_threads=0, + debug_sampling=DEBUG_SAMPLING, + ) + + # Map seed IDs to indices for in-memory sampler + node_ids = self.graph.node_sets["paper"].features["id"] + seed_bytes = seed_ids + seed_indices = [] + for sb in seed_bytes: + idx = np.where(node_ids == sb)[0] + if len(idx) == 0: + raise ValueError(f"Seed ID {sb} not found in graph") + seed_indices.append(idx[0]) + + start_time = time.time() + in_mem_samples = in_memory_sampler.sample(seed_indices) + in_mem_time = time.time() - start_time + in_memory_graph_validate_lib.validate_graph( + in_mem_samples[0], self.schema, raise_on_warning=False + ) + print(f"In-Memory sampling took {in_mem_time:.4f} seconds\n") + print_graph_stats("In-Memory", in_mem_samples, seed_ids) + + self.assertEqual(len(cte_samples), len(seed_ids)) + self.assertEqual(len(in_mem_samples), len(seed_ids)) + + if DEBUG_SAMPLING: + for i in range(len(seed_ids)): + norm_cte = normalize_graph(cte_samples[i], self.schema) + norm_in_mem = normalize_graph(in_mem_samples[i], self.schema) + + # Compare In-Memory vs CTE + test_util.assert_are_equal(self, norm_in_mem, norm_cte) if __name__ == "__main__": diff --git a/dgf/src/sampling/gcp/spanner_graph_sampler_test.py b/dgf/src/sampling/gcp/spanner_graph_sampler_test.py index b139e2d..d54ec94 100644 --- a/dgf/src/sampling/gcp/spanner_graph_sampler_test.py +++ b/dgf/src/sampling/gcp/spanner_graph_sampler_test.py @@ -59,7 +59,7 @@ def _toy_schema(has_feat: bool = False): class SpannerGraphSamplerTest(parameterized.TestCase): - def test_generate_gql_query(self): + def test_generate_cte_query(self): schema = schema_lib.GraphSchema( node_sets={ "N1": schema_lib.NodeSchema( @@ -89,149 +89,117 @@ def test_generate_gql_query(self): ) plan_tree = config_lib.simple_sampling_config_to_sampling_plan(plan, schema) - query = spanner_graph_sampler_lib._generate_gql_query( + query = spanner_graph_sampler_lib.CteQueryGenerator( graph_name="my_graph", - plan=plan_tree, schema=schema, - seed_ids=["10", "11"], + plan=plan_tree, debug_sampling=False, - ) - - expected_query_path = os.path.join( - test_util.dgf_test_data_path(), - "spanner_graph_sampler_test_generate_gql_query.txt", - ) - with open(expected_query_path, "r") as f: - expected_query = f.read() - - self.assertEqual(query, expected_query) + ).generate() - def test_json_to_in_memory_graphs(self): - # Note: "spanner_graph_sampler_test_json_to_in_memory_graphs" as been - # generated using the e2e manual test below. - raw_query_path = os.path.join( - test_util.dgf_test_data_path(), - "spanner_graph_sampler_test_json_to_in_memory_graphs.json", - ) - with open(raw_query_path, "r") as f: - query = json.load(f) - schema = _toy_schema() - graphs = spanner_graph_sampler_lib._json_to_in_memory_graphs( - query, schema, ["10", "11"], "nodes" - ) - - test_util.assert_are_equal( + test_util.assert_golden_string( self, - graphs, - [ - InMemoryGraph( - node_sets={ - "nodes": InMemoryNodeSet( - num_nodes=5, - features={ - "id": np.array( - [b"10", b"10090", b"92331", b"25350", b"46510"], - ), - "labels": np.array([24, 34, 8, 8, 24]), - "year": np.array([2012, 2007, 2013, 2011, 2013]), - }, - ) - }, - edge_sets={ - "edges": InMemoryEdgeSet( - adjacency=np.array([[0, 2, 0, 4], [1, 0, 3, 0]]) - ) - }, - ), - InMemoryGraph( - node_sets={ - "nodes": InMemoryNodeSet( - num_nodes=3, - features={ - "id": np.array([b"11", b"54035", b"142591"]), - "labels": np.array([36, 36, 36]), - "year": np.array([2015, 2012, 2012]), - }, - ) - }, - edge_sets={ - "edges": InMemoryEdgeSet( - adjacency=np.array([[0, 0], [1, 2]]) - ) - }, - ), - ], + query, + "spanner_graph_sampler_test_generate_cte_query.txt", + strip=True, ) - def test_json_to_in_memory_graphs_deduplication(self): + def test_generate_cte_query_directed(self): schema = schema_lib.GraphSchema( node_sets={ - "nodes": schema_lib.NodeSchema( + "N1": schema_lib.NodeSchema( features={ "id": schema_lib.FeatureSchema( format=schema_lib.FeatureFormat.BYTES, semantic=schema_lib.FeatureSemantic.PRIMARY_ID, ) } - ) + ), + "N2": schema_lib.NodeSchema( + features={ + "id": schema_lib.FeatureSchema( + format=schema_lib.FeatureFormat.BYTES, + semantic=schema_lib.FeatureSemantic.PRIMARY_ID, + ) + } + ), }, edge_sets={ - "edges": schema_lib.EdgeSchema(source="nodes", target="nodes") + "E1": schema_lib.EdgeSchema(source="N1", target="N1"), + "E2": schema_lib.EdgeSchema(source="N1", target="N2"), }, ) + plan = config_lib.SimpleSamplingConfig( + seed_nodeset="N1", num_hops=2, hop_width=2, reverse=False + ) + plan_tree = config_lib.simple_sampling_config_to_sampling_plan(plan, schema) + + query = spanner_graph_sampler_lib.CteQueryGenerator( + graph_name="my_graph", + schema=schema, + plan=plan_tree, + debug_sampling=False, + ).generate() - # Mock JSON result with duplicate edges. - # Row 0: Seed '10', Path with edge 'e1' from '10' to '20'. - # Row 1: Seed '10', Another path with the SAME edge 'e1' from '10' to '20'. - query = [ + test_util.assert_golden_string( + self, + query, + "spanner_graph_sampler_test_generate_cte_query_directed.txt", + strip=True, + ) + + def test_cte_result_to_in_memory_graphs(self): + # Mock CTE flat rows. + # Columns: [seed_id, node_id, element_type, element_class, source_id, target_id, properties_json] + # IDs are base64 encoded bytes as returned by Spanner for BYTES columns. + # '10' -> b'MTA=' + # '11' -> b'MTE=' + # '10090' -> b'MTAwOTA=' + # '92331' -> b'OTIzMzE=' + query_results = [ + # Seed 10 + [ + b"MTA=", + b"MTA=", + "node", + "nodes", + None, + None, + '{"labels": 24, "year": 2012}', + ], + [b"MTA=", None, "edge", "edges", b"MTA=", b"MTAwOTA=", "{}"], + [ + b"MTA=", + b"MTAwOTA=", + "node", + "nodes", + None, + None, + '{"labels": 34, "year": 2007}', + ], + [b"MTA=", None, "edge", "edges", b"MTA=", b"OTIzMzE=", "{}"], [ - { - "kind": "node", - "labels": ["nodes"], - "identifier": "10", - "properties": {"id": "10"}, - }, - { - "kind": "node", - "labels": ["nodes"], - "identifier": "20", - "properties": {"id": "20"}, - }, - { - "kind": "edge", - "labels": ["edges"], - "identifier": "e1", - "source_node_identifier": "10", - "destination_node_identifier": "20", - "properties": {}, - }, + b"MTA=", + b"OTIzMzE=", + "node", + "nodes", + None, + None, + '{"labels": 8, "year": 2013}', ], + # Seed 11 [ - { - "kind": "node", - "labels": ["nodes"], - "identifier": "10", - "properties": {"id": "10"}, - }, - { - "kind": "node", - "labels": ["nodes"], - "identifier": "20", - "properties": {"id": "20"}, - }, - { - "kind": "edge", - "labels": ["edges"], - "identifier": "e1", - "source_node_identifier": "10", - "destination_node_identifier": "20", - "properties": {}, - }, + b"MTE=", + b"MTE=", + "node", + "nodes", + None, + None, + '{"labels": 36, "year": 2015}', ], ] - - graphs = spanner_graph_sampler_lib._json_to_in_memory_graphs( - query, schema, ["10"], "nodes" + schema = _toy_schema() + graphs = spanner_graph_sampler_lib._cte_result_to_in_memory_graphs( + query_results, schema, [b"10", b"11"], "nodes" ) test_util.assert_are_equal( @@ -241,14 +209,35 @@ def test_json_to_in_memory_graphs_deduplication(self): InMemoryGraph( node_sets={ "nodes": InMemoryNodeSet( - num_nodes=2, + num_nodes=3, features={ - "id": np.array([b"10", b"20"]), + "id": np.array([b"10", b"10090", b"92331"]), + "labels": np.array([24, 34, 8]), + "year": np.array([2012, 2007, 2013]), }, ) }, edge_sets={ - "edges": InMemoryEdgeSet(adjacency=np.array([[0], [1]])) + "edges": InMemoryEdgeSet( + adjacency=np.array([[0, 0], [1, 2]]) + ) + }, + ), + InMemoryGraph( + node_sets={ + "nodes": InMemoryNodeSet( + num_nodes=1, + features={ + "id": np.array([b"11"]), + "labels": np.array([36]), + "year": np.array([2015]), + }, + ) + }, + edge_sets={ + "edges": InMemoryEdgeSet( + adjacency=np.zeros((2, 0), dtype=np.int64) + ) }, ), ], diff --git a/test_data/spanner_graph_sampler_test_generate_cte_query.txt b/test_data/spanner_graph_sampler_test_generate_cte_query.txt new file mode 100644 index 0000000..2a8f679 --- /dev/null +++ b/test_data/spanner_graph_sampler_test_generate_cte_query.txt @@ -0,0 +1,251 @@ +WITH + source_node AS ( + SELECT n0_id AS seed_id, n0_id AS n0_id + FROM GRAPH_TABLE(my_graph + MATCH (n0:N1 WHERE n0.id IN UNNEST(@seed_ids)) + RETURN n0.id AS n0_id + ) +), + hop_1 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n1_id, p.n0_id AS parent_id + FROM source_node p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E1]->(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n0_id = gt.parent_id +), + hop_2 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n2_id, p.n1_id AS parent_id + FROM hop_1 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E1]->(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n1_id = gt.parent_id +), + hop_3 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n3_id, p.n1_id AS parent_id + FROM hop_1 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)<-[ge:E1]-(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n1_id = gt.parent_id +), + hop_4 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n4_id, p.n1_id AS parent_id + FROM hop_1 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E2]->(gc:N2) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n1_id = gt.parent_id +), + hop_5 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n5_id, p.n0_id AS parent_id + FROM source_node p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)<-[ge:E1]-(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n0_id = gt.parent_id +), + hop_6 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n6_id, p.n5_id AS parent_id + FROM hop_5 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E1]->(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n5_id = gt.parent_id +), + hop_7 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n7_id, p.n5_id AS parent_id + FROM hop_5 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)<-[ge:E1]-(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n5_id = gt.parent_id +), + hop_8 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n8_id, p.n5_id AS parent_id + FROM hop_5 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E2]->(gc:N2) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n5_id = gt.parent_id +), + hop_9 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n9_id, p.n0_id AS parent_id + FROM source_node p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E2]->(gc:N2) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n0_id = gt.parent_id +), + hop_10 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n10_id, p.n9_id AS parent_id + FROM hop_9 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N2)<-[ge:E2]-(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n9_id = gt.parent_id +) +SELECT seed_id, node_id, element_type, element_class, source_id, target_id, properties_json FROM ( +SELECT n0_id AS seed_id, n0_id AS node_id, 'node' AS element_type, 'N1' AS element_class, CAST(NULL AS BYTES) AS source_id, CAST(NULL AS BYTES) AS target_id, TO_JSON(n) AS properties_json FROM source_node AS h JOIN N1 AS n ON h.n0_id = n.id +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E1' AS element_class, parent_id AS source_id, n1_id AS target_id, edge_json AS properties_json FROM hop_1 +UNION ALL + + SELECT + seed_id AS seed_id, + n1_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_1 AS h + JOIN N1 AS n ON h.n1_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E1' AS element_class, parent_id AS source_id, n2_id AS target_id, edge_json AS properties_json FROM hop_2 +UNION ALL + + SELECT + seed_id AS seed_id, + n2_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_2 AS h + JOIN N1 AS n ON h.n2_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E1' AS element_class, n3_id AS source_id, parent_id AS target_id, edge_json AS properties_json FROM hop_3 +UNION ALL + + SELECT + seed_id AS seed_id, + n3_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_3 AS h + JOIN N1 AS n ON h.n3_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E2' AS element_class, parent_id AS source_id, n4_id AS target_id, edge_json AS properties_json FROM hop_4 +UNION ALL + + SELECT + seed_id AS seed_id, + n4_id AS node_id, + 'node' AS element_type, + 'N2' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_4 AS h + JOIN N2 AS n ON h.n4_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E1' AS element_class, n5_id AS source_id, parent_id AS target_id, edge_json AS properties_json FROM hop_5 +UNION ALL + + SELECT + seed_id AS seed_id, + n5_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_5 AS h + JOIN N1 AS n ON h.n5_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E1' AS element_class, parent_id AS source_id, n6_id AS target_id, edge_json AS properties_json FROM hop_6 +UNION ALL + + SELECT + seed_id AS seed_id, + n6_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_6 AS h + JOIN N1 AS n ON h.n6_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E1' AS element_class, n7_id AS source_id, parent_id AS target_id, edge_json AS properties_json FROM hop_7 +UNION ALL + + SELECT + seed_id AS seed_id, + n7_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_7 AS h + JOIN N1 AS n ON h.n7_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E2' AS element_class, parent_id AS source_id, n8_id AS target_id, edge_json AS properties_json FROM hop_8 +UNION ALL + + SELECT + seed_id AS seed_id, + n8_id AS node_id, + 'node' AS element_type, + 'N2' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_8 AS h + JOIN N2 AS n ON h.n8_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E2' AS element_class, parent_id AS source_id, n9_id AS target_id, edge_json AS properties_json FROM hop_9 +UNION ALL + + SELECT + seed_id AS seed_id, + n9_id AS node_id, + 'node' AS element_type, + 'N2' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_9 AS h + JOIN N2 AS n ON h.n9_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E2' AS element_class, n10_id AS source_id, parent_id AS target_id, edge_json AS properties_json FROM hop_10 +UNION ALL + + SELECT + seed_id AS seed_id, + n10_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_10 AS h + JOIN N1 AS n ON h.n10_id = n.id + +) \ No newline at end of file diff --git a/test_data/spanner_graph_sampler_test_generate_cte_query_directed.txt b/test_data/spanner_graph_sampler_test_generate_cte_query_directed.txt new file mode 100644 index 0000000..086ba56 --- /dev/null +++ b/test_data/spanner_graph_sampler_test_generate_cte_query_directed.txt @@ -0,0 +1,107 @@ +WITH + source_node AS ( + SELECT n0_id AS seed_id, n0_id AS n0_id + FROM GRAPH_TABLE(my_graph + MATCH (n0:N1 WHERE n0.id IN UNNEST(@seed_ids)) + RETURN n0.id AS n0_id + ) +), + hop_1 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n1_id, p.n0_id AS parent_id + FROM source_node p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E1]->(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n0_id = gt.parent_id +), + hop_2 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n2_id, p.n1_id AS parent_id + FROM hop_1 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E1]->(gc:N1) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n1_id = gt.parent_id +), + hop_3 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n3_id, p.n1_id AS parent_id + FROM hop_1 p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E2]->(gc:N2) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n1_id = gt.parent_id +), + hop_4 AS ( + SELECT p.seed_id, gt.edge_json, gt.child_id AS n4_id, p.n0_id AS parent_id + FROM source_node p + JOIN GRAPH_TABLE(my_graph + MATCH (gp:N1)-[ge:E2]->(gc:N2) + FILTER IS_FIRST(2) OVER (PARTITION BY gp.id ORDER BY GENERATE_UUID()) + RETURN gp.id AS parent_id, TO_JSON(ge) AS edge_json, gc.id AS child_id + ) AS gt ON p.n0_id = gt.parent_id +) +SELECT seed_id, node_id, element_type, element_class, source_id, target_id, properties_json FROM ( +SELECT n0_id AS seed_id, n0_id AS node_id, 'node' AS element_type, 'N1' AS element_class, CAST(NULL AS BYTES) AS source_id, CAST(NULL AS BYTES) AS target_id, TO_JSON(n) AS properties_json FROM source_node AS h JOIN N1 AS n ON h.n0_id = n.id +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E1' AS element_class, parent_id AS source_id, n1_id AS target_id, edge_json AS properties_json FROM hop_1 +UNION ALL + + SELECT + seed_id AS seed_id, + n1_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_1 AS h + JOIN N1 AS n ON h.n1_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E1' AS element_class, parent_id AS source_id, n2_id AS target_id, edge_json AS properties_json FROM hop_2 +UNION ALL + + SELECT + seed_id AS seed_id, + n2_id AS node_id, + 'node' AS element_type, + 'N1' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_2 AS h + JOIN N1 AS n ON h.n2_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E2' AS element_class, parent_id AS source_id, n3_id AS target_id, edge_json AS properties_json FROM hop_3 +UNION ALL + + SELECT + seed_id AS seed_id, + n3_id AS node_id, + 'node' AS element_type, + 'N2' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_3 AS h + JOIN N2 AS n ON h.n3_id = n.id + +UNION ALL +SELECT seed_id AS seed_id, CAST(NULL AS BYTES) AS node_id, 'edge' AS element_type, 'E2' AS element_class, parent_id AS source_id, n4_id AS target_id, edge_json AS properties_json FROM hop_4 +UNION ALL + + SELECT + seed_id AS seed_id, + n4_id AS node_id, + 'node' AS element_type, + 'N2' AS element_class, + CAST(NULL AS BYTES) AS source_id, + CAST(NULL AS BYTES) AS target_id, + TO_JSON(n) AS properties_json + FROM hop_4 AS h + JOIN N2 AS n ON h.n4_id = n.id + +) \ No newline at end of file diff --git a/test_data/spanner_graph_sampler_test_generate_gql_query.txt b/test_data/spanner_graph_sampler_test_generate_gql_query.txt deleted file mode 100644 index 5b82dfa..0000000 --- a/test_data/spanner_graph_sampler_test_generate_gql_query.txt +++ /dev/null @@ -1,56 +0,0 @@ -GRAPH my_graph -MATCH (n0 WHERE n0.id IN ('10', '11')) -OPTIONAL MATCH (n0)-[e1:E1 WHERE e1 IN { - MATCH -[selected_e:E1]->() - FILTER IS_FIRST(2) OVER (PARTITION BY SOURCE_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]->(n1) -[e2:E1 WHERE e2 IN { - MATCH -[selected_e:E1]->() - FILTER IS_FIRST(2) OVER (PARTITION BY SOURCE_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]->(n2) - -OPTIONAL MATCH (n1) <-[e3:E1 WHERE e3 IN { - MATCH <-[selected_e:E1]-() - FILTER IS_FIRST(2) OVER (PARTITION BY DESTINATION_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]-(n3) - -OPTIONAL MATCH (n1) -[e4:E2 WHERE e4 IN { - MATCH -[selected_e:E2]->() - FILTER IS_FIRST(2) OVER (PARTITION BY SOURCE_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]->(n4) - -OPTIONAL MATCH (n0) <-[e5:E1 WHERE e5 IN { - MATCH <-[selected_e:E1]-() - FILTER IS_FIRST(2) OVER (PARTITION BY DESTINATION_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]-(n5) -[e6:E1 WHERE e6 IN { - MATCH -[selected_e:E1]->() - FILTER IS_FIRST(2) OVER (PARTITION BY SOURCE_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]->(n6) - -OPTIONAL MATCH (n5) <-[e7:E1 WHERE e7 IN { - MATCH <-[selected_e:E1]-() - FILTER IS_FIRST(2) OVER (PARTITION BY DESTINATION_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]-(n7) - -OPTIONAL MATCH (n5) -[e8:E2 WHERE e8 IN { - MATCH -[selected_e:E2]->() - FILTER IS_FIRST(2) OVER (PARTITION BY SOURCE_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]->(n8) - -OPTIONAL MATCH (n0) -[e9:E2 WHERE e9 IN { - MATCH -[selected_e:E2]->() - FILTER IS_FIRST(2) OVER (PARTITION BY SOURCE_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]->(n9) <-[e10:E2 WHERE e10 IN { - MATCH <-[selected_e:E2]-() - FILTER IS_FIRST(2) OVER (PARTITION BY DESTINATION_NODE_ID(selected_e) ORDER BY GENERATE_UUID()) - RETURN selected_e - }]-(n10) -RETURN DISTINCT TO_JSON(n0) AS n0, TO_JSON(n1) AS n1, TO_JSON(n2) AS n2, TO_JSON(n3) AS n3, TO_JSON(n4) AS n4, TO_JSON(n5) AS n5, TO_JSON(n6) AS n6, TO_JSON(n7) AS n7, TO_JSON(n8) AS n8, TO_JSON(n9) AS n9, TO_JSON(n10) AS n10, TO_JSON(e1) AS e1, TO_JSON(e2) AS e2, TO_JSON(e3) AS e3, TO_JSON(e4) AS e4, TO_JSON(e5) AS e5, TO_JSON(e6) AS e6, TO_JSON(e7) AS e7, TO_JSON(e8) AS e8, TO_JSON(e9) AS e9, TO_JSON(e10) AS e10 \ No newline at end of file diff --git a/test_data/spanner_graph_sampler_test_json_to_in_memory_graphs_cte.json b/test_data/spanner_graph_sampler_test_json_to_in_memory_graphs_cte.json new file mode 100644 index 0000000..976a406 --- /dev/null +++ b/test_data/spanner_graph_sampler_test_json_to_in_memory_graphs_cte.json @@ -0,0 +1,740 @@ +[ + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAwOTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10090", + "labels": 34, + "year": 2007 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZOTIzMzEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "92331", + "labels": 8, + "year": 2013 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAwOTAAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZMTAAeJyZMTAwOTAAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTEwAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkxMDA5MAB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "10", + "target_id": "10090" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZOTIzMzEAeJyZMTAAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTkyMzMxAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkxMAB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "92331", + "target_id": "10" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZOTIzMzEAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMjUzNTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "25350", + "labels": 8, + "year": 2011 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZOTIzMzEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "92331", + "labels": 8, + "year": 2013 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMjUzNTAAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZMTAAeJyZMjUzNTAAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTEwAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkyNTM1MAB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "10", + "target_id": "25350" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZOTIzMzEAeJyZMTAAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTkyMzMxAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkxMAB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "92331", + "target_id": "10" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZOTIzMzEAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAwOTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10090", + "labels": 34, + "year": 2007 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZNDY1MTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "46510", + "labels": 24, + "year": 2013 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAwOTAAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZMTAAeJyZMTAwOTAAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTEwAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkxMDA5MAB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "10", + "target_id": "10090" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZNDY1MTAAeJyZMTAAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTQ2NTEwAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkxMAB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "46510", + "target_id": "10" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZNDY1MTAAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMjUzNTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "25350", + "labels": 8, + "year": 2011 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZNDY1MTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "46510", + "labels": 24, + "year": 2013 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMjUzNTAAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZMTAAeJyZMjUzNTAAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTEwAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkyNTM1MAB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "10", + "target_id": "25350" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "10", + "labels": 24, + "year": 2012 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTAAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZNDY1MTAAeJyZMTAAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTQ2NTEwAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkxMAB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "46510", + "target_id": "10" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZNDY1MTAAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "11", + "labels": 36, + "year": 2015 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "11", + "labels": 36, + "year": 2015 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "11", + "labels": 36, + "year": 2015 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZNTQwMzUAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "54035", + "labels": 36, + "year": 2012 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "11", + "labels": 36, + "year": 2015 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZNTQwMzUAeA==", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZMTEAeJyZNTQwMzUAeJlvZ2JuX2FyeGl2Lm5vZGVzAHicmTExAHiZb2dibl9hcnhpdi5ub2RlcwB4nJk1NDAzNQB4", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "11", + "target_id": "54035" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==" + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "11", + "labels": 36, + "year": 2015 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "11", + "labels": 36, + "year": 2015 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "11", + "labels": 36, + "year": 2015 + } + }, + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTQyNTkxAHg=", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "142591", + "labels": 36, + "year": 2012 + } + } + ], + [ + { + "element_definition_name": "nodes", + "identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==", + "kind": "node", + "labels": [ + "nodes" + ], + "properties": { + "id": "11", + "labels": 36, + "year": 2015 + } + }, + { + "destination_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTQyNTkxAHg=", + "element_definition_name": "edges", + "identifier": "mW9nYm5fYXJ4aXYuZWRnZXMAeJyZMTEAeJyZMTQyNTkxAHiZb2dibl9hcnhpdi5ub2RlcwB4nJkxMQB4mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTQyNTkxAHg=", + "kind": "edge", + "labels": [ + "edges" + ], + "properties": { + "id": "11", + "target_id": "142591" + }, + "source_node_identifier": "mW9nYm5fYXJ4aXYubm9kZXMAeJyZMTEAeA==" + } + ] +] \ No newline at end of file