Skip to content

Commit 2d6a1b9

Browse files
authored
feat: Add DeleteFileIndex to improve position delete lookup (#2918)
Related to #2255. # Rationale for this change This PR is a piece of the existing DFI PR in #2255. However, this rips out the existing delete->data matching behavior for deletes and indexes them for efficient lookup. The previous implementation: 1. Scanned all delete files with sequence number >= data file's sequence number 2. Created a new `_InclusiveMetricsEvaluator` instance for each data file 3. Evaluated every candidate delete file against the data file's path Now we extend this workflow with a `DeleteFileIndex` that: - INdexes path specific DVs - Indexes partition-scoped deletes by (spec_id, partition record) - Uses bisect_left for sequence number filtering This aligns with the Java implementation of the [DeleteFileIndex](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java), following the python infra. ## Are these changes tested? New tests added and existing tests continue to pass ## Are there any user-facing changes? No
1 parent 26ecfe7 commit 2d6a1b9

4 files changed

Lines changed: 339 additions & 152 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
)
3434

3535
from pydantic import Field
36-
from sortedcontainers import SortedList
3736

3837
import pyiceberg.expressions.parser as parser
3938
from pyiceberg.expressions import (
@@ -56,7 +55,6 @@
5655
)
5756
from pyiceberg.io import FileIO, load_file_io
5857
from pyiceberg.manifest import (
59-
POSITIONAL_DELETE_SCHEMA,
6058
DataFile,
6159
DataFileContent,
6260
ManifestContent,
@@ -70,6 +68,7 @@
7068
PartitionSpec,
7169
)
7270
from pyiceberg.schema import Schema
71+
from pyiceberg.table.delete_file_index import DeleteFileIndex
7372
from pyiceberg.table.inspect import InspectTable
7473
from pyiceberg.table.locations import LocationProvider, load_location_provider
7574
from pyiceberg.table.maintenance import MaintenanceTable
@@ -1951,31 +1950,6 @@ def _min_sequence_number(manifests: list[ManifestFile]) -> int:
19511950
return INITIAL_SEQUENCE_NUMBER
19521951

19531952

1954-
def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> set[DataFile]:
1955-
"""Check if the delete file is relevant for the data file.
1956-
1957-
Using the column metrics to see if the filename is in the lower and upper bound.
1958-
1959-
Args:
1960-
data_entry (ManifestEntry): The manifest entry path of the datafile.
1961-
positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries.
1962-
1963-
Returns:
1964-
A set of files that are relevant for the data file.
1965-
"""
1966-
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]
1967-
1968-
if len(relevant_entries) > 0:
1969-
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
1970-
return {
1971-
positional_delete_entry.data_file
1972-
for positional_delete_entry in relevant_entries
1973-
if evaluator.eval(positional_delete_entry.data_file)
1974-
}
1975-
else:
1976-
return set()
1977-
1978-
19791953
class DataScan(TableScan):
19801954
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
19811955
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
@@ -2120,7 +2094,7 @@ def _plan_files_server_side(self) -> Iterable[FileScanTask]:
21202094
def _plan_files_local(self) -> Iterable[FileScanTask]:
21212095
"""Plan files locally by reading manifests."""
21222096
data_entries: list[ManifestEntry] = []
2123-
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
2097+
delete_index = DeleteFileIndex()
21242098

21252099
residual_evaluators: dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)
21262100

@@ -2129,18 +2103,18 @@ def _plan_files_local(self) -> Iterable[FileScanTask]:
21292103
if data_file.content == DataFileContent.DATA:
21302104
data_entries.append(manifest_entry)
21312105
elif data_file.content == DataFileContent.POSITION_DELETES:
2132-
positional_delete_entries.add(manifest_entry)
2106+
delete_index.add_delete_file(manifest_entry, partition_key=data_file.partition)
21332107
elif data_file.content == DataFileContent.EQUALITY_DELETES:
21342108
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
21352109
else:
21362110
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")
2137-
21382111
return [
21392112
FileScanTask(
21402113
data_entry.data_file,
2141-
delete_files=_match_deletes_to_data_file(
2142-
data_entry,
2143-
positional_delete_entries,
2114+
delete_files=delete_index.for_data_file(
2115+
data_entry.sequence_number or INITIAL_SEQUENCE_NUMBER,
2116+
data_entry.data_file,
2117+
partition_key=data_entry.data_file.partition,
21442118
),
21452119
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
21462120
data_entry.data_file.partition
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from bisect import bisect_left
20+
21+
from pyiceberg.expressions import EqualTo
22+
from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
23+
from pyiceberg.manifest import INITIAL_SEQUENCE_NUMBER, POSITIONAL_DELETE_SCHEMA, DataFile, ManifestEntry
24+
from pyiceberg.typedef import Record
25+
26+
PATH_FIELD_ID = 2147483546
27+
28+
29+
class PositionDeletes:
30+
"""Collects position delete files and indexes them by sequence number."""
31+
32+
__slots__ = ("_buffer", "_seqs", "_files")
33+
34+
def __init__(self) -> None:
35+
self._buffer: list[tuple[DataFile, int]] | None = []
36+
self._seqs: list[int] = []
37+
self._files: list[tuple[DataFile, int]] = []
38+
39+
def add(self, delete_file: DataFile, seq_num: int) -> None:
40+
if self._buffer is None:
41+
raise ValueError("Cannot add files after indexing")
42+
self._buffer.append((delete_file, seq_num))
43+
44+
def _ensure_indexed(self) -> None:
45+
if self._buffer is not None:
46+
self._files = sorted(self._buffer, key=lambda file: file[1])
47+
self._seqs = [seq for _, seq in self._files]
48+
self._buffer = None
49+
50+
def filter_by_seq(self, seq: int) -> list[DataFile]:
51+
self._ensure_indexed()
52+
if not self._files:
53+
return []
54+
start_idx = bisect_left(self._seqs, seq)
55+
return [delete_file for delete_file, _ in self._files[start_idx:]]
56+
57+
58+
def _has_path_bounds(delete_file: DataFile) -> bool:
59+
lower = delete_file.lower_bounds
60+
upper = delete_file.upper_bounds
61+
if not lower or not upper:
62+
return False
63+
64+
return PATH_FIELD_ID in lower and PATH_FIELD_ID in upper
65+
66+
67+
def _applies_to_data_file(delete_file: DataFile, data_file: DataFile) -> bool:
68+
if not _has_path_bounds(delete_file):
69+
return True
70+
71+
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_file.file_path))
72+
return evaluator.eval(delete_file)
73+
74+
75+
def _referenced_data_file_path(delete_file: DataFile) -> str | None:
76+
"""Return the path, if the path bounds evaluate to the same location."""
77+
lower_bounds = delete_file.lower_bounds
78+
upper_bounds = delete_file.upper_bounds
79+
80+
if not lower_bounds or not upper_bounds:
81+
return None
82+
83+
lower = lower_bounds.get(PATH_FIELD_ID)
84+
upper = upper_bounds.get(PATH_FIELD_ID)
85+
86+
if lower and upper and lower == upper:
87+
try:
88+
return lower.decode("utf-8")
89+
except (UnicodeDecodeError, AttributeError):
90+
pass
91+
92+
return None
93+
94+
95+
def _partition_key(spec_id: int, partition: Record | None) -> tuple[int, Record]:
96+
if partition:
97+
return spec_id, partition
98+
return spec_id, Record() # unpartitioned handling
99+
100+
101+
class DeleteFileIndex:
102+
"""Indexes position delete files by partition and by exact data file path."""
103+
104+
def __init__(self) -> None:
105+
self._by_partition: dict[tuple[int, Record], PositionDeletes] = {}
106+
self._by_path: dict[str, PositionDeletes] = {}
107+
108+
def is_empty(self) -> bool:
109+
return not self._by_partition and not self._by_path
110+
111+
def add_delete_file(self, manifest_entry: ManifestEntry, partition_key: Record | None = None) -> None:
112+
delete_file = manifest_entry.data_file
113+
seq = manifest_entry.sequence_number or INITIAL_SEQUENCE_NUMBER
114+
target_path = _referenced_data_file_path(delete_file)
115+
116+
if target_path:
117+
deletes = self._by_path.setdefault(target_path, PositionDeletes())
118+
deletes.add(delete_file, seq)
119+
else:
120+
key = _partition_key(delete_file.spec_id or 0, partition_key)
121+
deletes = self._by_partition.setdefault(key, PositionDeletes())
122+
deletes.add(delete_file, seq)
123+
124+
def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record | None = None) -> set[DataFile]:
125+
if self.is_empty():
126+
return set()
127+
128+
deletes: set[DataFile] = set()
129+
spec_id = data_file.spec_id or 0
130+
131+
key = _partition_key(spec_id, partition_key)
132+
partition_deletes = self._by_partition.get(key)
133+
if partition_deletes:
134+
for delete_file in partition_deletes.filter_by_seq(seq_num):
135+
if _applies_to_data_file(delete_file, data_file):
136+
deletes.add(delete_file)
137+
138+
path_deletes = self._by_path.get(data_file.file_path)
139+
if path_deletes:
140+
deletes.update(path_deletes.filter_by_seq(seq_num))
141+
142+
return deletes

0 commit comments

Comments
 (0)