-
Notifications
You must be signed in to change notification settings - Fork 26
RDBC-1037 RDBC-1038 Fix load_into_stream and implement stream_into/to_stream #278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v7.2
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| import time | ||
| import uuid | ||
| from typing import ( | ||
| BinaryIO, | ||
| Union, | ||
| Callable, | ||
| TYPE_CHECKING, | ||
|
|
@@ -367,21 +368,23 @@ def _load_internal( | |
|
|
||
| return load_operation.get_documents(object_type) | ||
|
|
||
| def _load_internal_stream(self, keys: List[str], operation: LoadOperation, stream: Optional[bytes] = None) -> None: | ||
| def _load_internal_stream( | ||
| self, keys: List[str], operation: LoadOperation, stream: Optional[BinaryIO] = None | ||
| ) -> None: | ||
| operation.by_keys(keys) | ||
|
|
||
| command = operation.create_request() | ||
|
|
||
| if command: | ||
| self._request_executor.execute_command(command, self.session_info) | ||
|
|
||
| if stream: | ||
| if stream is not None: | ||
| try: | ||
| result = command.result | ||
| stream_to_dict = json.loads(stream.decode("utf-8")) | ||
| result.__dict__.update(stream_to_dict) | ||
| except IOError as e: | ||
| raise RuntimeError(f"Unable to serialize returned value into stream {e.args[0]}", e) | ||
| data = json.dumps({k: v for k, v in result.to_json().items() if v is not None}).encode("utf-8") | ||
| stream.write(data) | ||
| except Exception as e: | ||
| raise RuntimeError("Unable to serialize returned value into stream") from e | ||
| else: | ||
| operation.set_result(command.result) | ||
|
|
||
|
|
@@ -404,16 +407,19 @@ def load_starting_with( | |
| def load_starting_with_into_stream( | ||
| self, | ||
| id_prefix: str, | ||
| output: BinaryIO, | ||
| matches: str = None, | ||
| start: int = 0, | ||
| page_size: int = 25, | ||
| exclude: str = None, | ||
| start_after: str = None, | ||
| ) -> bytes: | ||
| ) -> None: | ||
| if id_prefix is None: | ||
| raise ValueError("Arg 'id_prefix' is cannot be None.") | ||
| return self._load_starting_with_into_stream_internal( | ||
| id_prefix, LoadStartingWithOperation(self), matches, start, page_size, exclude, start_after | ||
| raise ValueError("id_prefix cannot be None.") | ||
| if output is None: | ||
| raise ValueError("output cannot be None") | ||
| self._load_starting_with_into_stream_internal( | ||
| id_prefix, LoadStartingWithOperation(self), output, matches, start, page_size, exclude, start_after | ||
| ) | ||
|
|
||
| def _load_starting_with_internal( | ||
|
|
@@ -437,23 +443,23 @@ def _load_starting_with_into_stream_internal( | |
| self, | ||
| id_prefix: str, | ||
| operation: LoadStartingWithOperation, | ||
| output: BinaryIO, | ||
| matches: str, | ||
| start: int, | ||
| page_size: int, | ||
| exclude: str, | ||
| start_after: str, | ||
| ) -> bytes: | ||
| ) -> None: | ||
| operation.with_start_with(id_prefix, matches, start, page_size, exclude, start_after) | ||
| command = operation.create_request() | ||
| bytes_result = None | ||
| if command: | ||
| self.request_executor.execute_command(command, self.session_info) | ||
| try: | ||
| result = command.result | ||
| bytes_result = json.dumps(result.to_json()).encode("utf-8") | ||
| data = json.dumps({k: v for k, v in result.to_json().items() if v is not None}).encode("utf-8") | ||
| output.write(data) | ||
| except Exception as e: | ||
| raise RuntimeError("Unable sto serialize returned value into stream") from e | ||
| return bytes_result | ||
| raise RuntimeError("Unable to serialize returned value into stream") from e | ||
|
|
||
| def document_query_from_index_type(self, index_type: Type[_TIndex], object_type: Type[_T]) -> DocumentQuery[_T]: | ||
| try: | ||
|
|
@@ -873,19 +879,29 @@ def load_starting_with( | |
| def load_starting_with_into_stream( | ||
| self, | ||
| id_prefix: str, | ||
| output: BinaryIO, | ||
| matches: str = None, | ||
| start: int = 0, | ||
| page_size: int = 25, | ||
| exclude: str = None, | ||
| start_after: str = None, | ||
| ) -> bytes: | ||
| ) -> None: | ||
| return self._session.load_starting_with_into_stream( | ||
| id_prefix, matches, start, page_size, exclude, start_after | ||
| id_prefix, output, matches, start, page_size, exclude, start_after | ||
| ) | ||
|
|
||
| def load_into_stream(self, keys: List[str], output: bytes) -> None: | ||
| def load_into_stream(self, keys: List[str], output: BinaryIO) -> None: | ||
| """Load documents by keys and write the raw JSON response to a binary stream. | ||
|
|
||
| Use this instead of ``load()`` when you need the server response as raw bytes | ||
| (e.g. forwarding to a file or HTTP response) without deserializing into entities. | ||
| ``output`` must be a binary-writable stream such as ``io.BytesIO`` or a file | ||
| opened with ``open(..., "wb")``. | ||
| """ | ||
| if keys is None: | ||
| raise ValueError("Keys cannot be None") | ||
| if output is None: | ||
| raise ValueError("output cannot be None") | ||
|
|
||
| self._session._load_internal_stream(keys, LoadOperation(self._session), output) | ||
|
|
||
|
|
@@ -1172,8 +1188,21 @@ def _yield_result(self, query: AbstractDocumentQuery, enumerator: Iterator[Dict] | |
| object_type=query.query_class, | ||
| ) | ||
|
|
||
| def stream_into(self): # query: Union[DocumentQuery, RawDocumentQuery], output: iter): | ||
| pass | ||
| def stream_into(self, query: AbstractDocumentQuery, output: BinaryIO) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add docstring here |
||
| stream_operation = StreamOperation(self._session) | ||
| command = stream_operation.create_request(query.index_query) | ||
|
|
||
| self.request_executor.execute_command(command, self.session_info) | ||
|
|
||
| with stream_operation.set_result(command.result) as result_iter: | ||
| output.write(b'{"Results":[') | ||
| first = True | ||
| for item in result_iter: | ||
| if not first: | ||
| output.write(b",") | ||
| output.write(json.dumps(item).encode("utf-8")) | ||
| first = False | ||
| output.write(b"]}") | ||
|
|
||
| def conditional_load( | ||
| self, key: str, change_vector: str, object_type: Type[_T] = None | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| import warnings | ||
| from copy import copy | ||
| from typing import ( | ||
| BinaryIO, | ||
| Generic, | ||
| TypeVar, | ||
| List, | ||
|
|
@@ -2757,6 +2758,9 @@ def suggest_using( | |
| self._suggest_using(suggestion_or_builder) | ||
| return SuggestionDocumentQuery(self) | ||
|
|
||
| def to_stream(self, output: BinaryIO) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add docstring here |
||
| self._the_session.advanced.stream_into(self, output) | ||
|
|
||
|
|
||
| class RawDocumentQuery(Generic[_T], AbstractDocumentQuery[_T]): | ||
| def __init__(self, object_type: Type[_T], session: InMemoryDocumentSessionOperations, raw_query: str): | ||
|
|
@@ -2834,6 +2838,9 @@ def projection(self, projection_behavior: ProjectionBehavior) -> RawDocumentQuer | |
| self._projection(projection_behavior) | ||
| return self | ||
|
|
||
| def to_stream(self, output: BinaryIO) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add docstring here |
||
| self._the_session.advanced.stream_into(self, output) | ||
|
|
||
|
|
||
| class DocumentQueryCustomizationDelegate(DocumentQueryCustomization): | ||
| def __init__(self, query: AbstractDocumentQuery): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| """ | ||
| RDBC-1037: load_into_stream writes to BytesIO output; load_starting_with_into_stream | ||
| takes an output parameter and writes to it. | ||
|
|
||
| C# reference: IAdvancedSessionOperations.LoadIntoStream() / LoadStartingWithIntoStream() | ||
| """ | ||
|
|
||
| import io | ||
| import json | ||
| import unittest | ||
|
|
||
| from ravendb.tests.test_base import TestBase | ||
|
|
||
|
|
||
| class TestLoadIntoStreamUnit(unittest.TestCase): | ||
| """Unit tests — no server required.""" | ||
|
|
||
| def test_load_into_stream_method_exists_on_advanced(self): | ||
| from ravendb.documents.session.document_session import DocumentSession | ||
|
|
||
| self.assertTrue(hasattr(DocumentSession._Advanced, "load_into_stream")) | ||
|
|
||
| def test_load_starting_with_into_stream_method_exists(self): | ||
| from ravendb.documents.session.document_session import DocumentSession | ||
|
|
||
| self.assertTrue(hasattr(DocumentSession._Advanced, "load_starting_with_into_stream")) | ||
|
|
||
|
|
||
| class TestLoadIntoStream(TestBase): | ||
| """Integration tests — require a live server.""" | ||
|
|
||
| def setUp(self): | ||
| super().setUp() | ||
| self.store = self.get_document_store() | ||
|
|
||
| def tearDown(self): | ||
| super().tearDown() | ||
| self.store.close() | ||
|
|
||
| def test_load_into_stream_writes_to_bytesio(self): | ||
| class Doc: | ||
| def __init__(self, name: str = None): | ||
| self.name = name | ||
|
|
||
| with self.store.open_session() as session: | ||
| session.store(Doc("alpha"), "docs/1") | ||
| session.store(Doc("beta"), "docs/2") | ||
| session.save_changes() | ||
|
|
||
| output = io.BytesIO() | ||
| with self.store.open_session() as session: | ||
| session.advanced.load_into_stream(["docs/1", "docs/2"], output) | ||
|
|
||
| output.seek(0) | ||
| data = json.loads(output.read()) | ||
| self.assertIn("Results", data) | ||
| names = [r["name"] for r in data["Results"]] | ||
| self.assertIn("alpha", names) | ||
| self.assertIn("beta", names) | ||
|
|
||
| def test_load_into_stream_single_document(self): | ||
| class Doc: | ||
| def __init__(self, name: str = None): | ||
| self.name = name | ||
|
|
||
| with self.store.open_session() as session: | ||
| session.store(Doc("gamma"), "docs/3") | ||
| session.save_changes() | ||
|
|
||
| output = io.BytesIO() | ||
| with self.store.open_session() as session: | ||
| session.advanced.load_into_stream(["docs/3"], output) | ||
|
|
||
| output.seek(0) | ||
| data = json.loads(output.read()) | ||
| self.assertIn("Results", data) | ||
| self.assertEqual(1, len(data["Results"])) | ||
| self.assertEqual("gamma", data["Results"][0]["name"]) | ||
|
|
||
| def test_load_starting_with_into_stream_writes_to_bytesio(self): | ||
| class Doc: | ||
| def __init__(self, name: str = None): | ||
| self.name = name | ||
|
|
||
| with self.store.open_session() as session: | ||
| session.store(Doc("one"), "prefix/1") | ||
| session.store(Doc("two"), "prefix/2") | ||
| session.save_changes() | ||
|
|
||
| output = io.BytesIO() | ||
| with self.store.open_session() as session: | ||
| session.advanced.load_starting_with_into_stream("prefix/", output) | ||
|
|
||
| output.seek(0) | ||
| data = json.loads(output.read()) | ||
| self.assertIn("Results", data) | ||
| self.assertEqual(2, len(data["Results"])) | ||
| expected_names = {"one", "two"} | ||
| for r in data["Results"]: | ||
| self.assertIn(r["name"], expected_names) | ||
| expected_names.discard(r["name"]) | ||
| self.assertEqual(0, len(expected_names)) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add docstring here