Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions ravendb/documents/session/document_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import uuid
from typing import (
BinaryIO,
Union,
Callable,
TYPE_CHECKING,
Expand Down Expand Up @@ -367,21 +368,23 @@ def _load_internal(

return load_operation.get_documents(object_type)

def _load_internal_stream(self, keys: List[str], operation: LoadOperation, stream: Optional[bytes] = None) -> None:
def _load_internal_stream(
self, keys: List[str], operation: LoadOperation, stream: Optional[BinaryIO] = None
) -> None:
operation.by_keys(keys)

command = operation.create_request()

if command:
self._request_executor.execute_command(command, self.session_info)

if stream:
if stream is not None:
try:
result = command.result
stream_to_dict = json.loads(stream.decode("utf-8"))
result.__dict__.update(stream_to_dict)
except IOError as e:
raise RuntimeError(f"Unable to serialize returned value into stream {e.args[0]}", e)
data = json.dumps({k: v for k, v in result.to_json().items() if v is not None}).encode("utf-8")
stream.write(data)
except Exception as e:
raise RuntimeError("Unable to serialize returned value into stream") from e
else:
operation.set_result(command.result)

Expand All @@ -404,16 +407,19 @@ def load_starting_with(
def load_starting_with_into_stream(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add docstring here

self,
id_prefix: str,
output: BinaryIO,
matches: str = None,
start: int = 0,
page_size: int = 25,
exclude: str = None,
start_after: str = None,
) -> bytes:
) -> None:
if id_prefix is None:
raise ValueError("Arg 'id_prefix' is cannot be None.")
return self._load_starting_with_into_stream_internal(
id_prefix, LoadStartingWithOperation(self), matches, start, page_size, exclude, start_after
raise ValueError("id_prefix cannot be None.")
if output is None:
raise ValueError("output cannot be None")
self._load_starting_with_into_stream_internal(
id_prefix, LoadStartingWithOperation(self), output, matches, start, page_size, exclude, start_after
)

def _load_starting_with_internal(
Expand All @@ -437,23 +443,23 @@ def _load_starting_with_into_stream_internal(
self,
id_prefix: str,
operation: LoadStartingWithOperation,
output: BinaryIO,
matches: str,
start: int,
page_size: int,
exclude: str,
start_after: str,
) -> bytes:
) -> None:
operation.with_start_with(id_prefix, matches, start, page_size, exclude, start_after)
command = operation.create_request()
bytes_result = None
if command:
self.request_executor.execute_command(command, self.session_info)
try:
result = command.result
bytes_result = json.dumps(result.to_json()).encode("utf-8")
data = json.dumps({k: v for k, v in result.to_json().items() if v is not None}).encode("utf-8")
output.write(data)
except Exception as e:
raise RuntimeError("Unable sto serialize returned value into stream") from e
return bytes_result
raise RuntimeError("Unable to serialize returned value into stream") from e

def document_query_from_index_type(self, index_type: Type[_TIndex], object_type: Type[_T]) -> DocumentQuery[_T]:
try:
Expand Down Expand Up @@ -873,19 +879,29 @@ def load_starting_with(
def load_starting_with_into_stream(
self,
id_prefix: str,
output: BinaryIO,
matches: str = None,
start: int = 0,
page_size: int = 25,
exclude: str = None,
start_after: str = None,
) -> bytes:
) -> None:
return self._session.load_starting_with_into_stream(
id_prefix, matches, start, page_size, exclude, start_after
id_prefix, output, matches, start, page_size, exclude, start_after
)

def load_into_stream(self, keys: List[str], output: bytes) -> None:
def load_into_stream(self, keys: List[str], output: BinaryIO) -> None:
"""Load documents by keys and write the raw JSON response to a binary stream.

Use this instead of ``load()`` when you need the server response as raw bytes
(e.g. forwarding to a file or HTTP response) without deserializing into entities.
``output`` must be a binary-writable stream such as ``io.BytesIO`` or a file
opened with ``open(..., "wb")``.
"""
if keys is None:
raise ValueError("Keys cannot be None")
if output is None:
raise ValueError("output cannot be None")

self._session._load_internal_stream(keys, LoadOperation(self._session), output)

Expand Down Expand Up @@ -1172,8 +1188,21 @@ def _yield_result(self, query: AbstractDocumentQuery, enumerator: Iterator[Dict]
object_type=query.query_class,
)

def stream_into(self): # query: Union[DocumentQuery, RawDocumentQuery], output: iter):
pass
def stream_into(self, query: AbstractDocumentQuery, output: BinaryIO) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add docstring here

stream_operation = StreamOperation(self._session)
command = stream_operation.create_request(query.index_query)

self.request_executor.execute_command(command, self.session_info)

with stream_operation.set_result(command.result) as result_iter:
output.write(b'{"Results":[')
first = True
for item in result_iter:
if not first:
output.write(b",")
output.write(json.dumps(item).encode("utf-8"))
first = False
output.write(b"]}")

def conditional_load(
self, key: str, change_vector: str, object_type: Type[_T] = None
Expand Down
7 changes: 7 additions & 0 deletions ravendb/documents/session/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import warnings
from copy import copy
from typing import (
BinaryIO,
Generic,
TypeVar,
List,
Expand Down Expand Up @@ -2757,6 +2758,9 @@ def suggest_using(
self._suggest_using(suggestion_or_builder)
return SuggestionDocumentQuery(self)

def to_stream(self, output: BinaryIO) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add docstring here

self._the_session.advanced.stream_into(self, output)


class RawDocumentQuery(Generic[_T], AbstractDocumentQuery[_T]):
def __init__(self, object_type: Type[_T], session: InMemoryDocumentSessionOperations, raw_query: str):
Expand Down Expand Up @@ -2834,6 +2838,9 @@ def projection(self, projection_behavior: ProjectionBehavior) -> RawDocumentQuer
self._projection(projection_behavior)
return self

def to_stream(self, output: BinaryIO) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add docstring here

self._the_session.advanced.stream_into(self, output)


class DocumentQueryCustomizationDelegate(DocumentQueryCustomization):
def __init__(self, query: AbstractDocumentQuery):
Expand Down
106 changes: 106 additions & 0 deletions ravendb/tests/issue_tests/test_RDBC_1037.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
RDBC-1037: load_into_stream writes to BytesIO output; load_starting_with_into_stream
takes an output parameter and writes to it.

C# reference: IAdvancedSessionOperations.LoadIntoStream() / LoadStartingWithIntoStream()
"""

import io
import json
import unittest

from ravendb.tests.test_base import TestBase


class TestLoadIntoStreamUnit(unittest.TestCase):
"""Unit tests — no server required."""

def test_load_into_stream_method_exists_on_advanced(self):
from ravendb.documents.session.document_session import DocumentSession

self.assertTrue(hasattr(DocumentSession._Advanced, "load_into_stream"))

def test_load_starting_with_into_stream_method_exists(self):
from ravendb.documents.session.document_session import DocumentSession

self.assertTrue(hasattr(DocumentSession._Advanced, "load_starting_with_into_stream"))


class TestLoadIntoStream(TestBase):
"""Integration tests — require a live server."""

def setUp(self):
super().setUp()
self.store = self.get_document_store()

def tearDown(self):
super().tearDown()
self.store.close()

def test_load_into_stream_writes_to_bytesio(self):
class Doc:
def __init__(self, name: str = None):
self.name = name

with self.store.open_session() as session:
session.store(Doc("alpha"), "docs/1")
session.store(Doc("beta"), "docs/2")
session.save_changes()

output = io.BytesIO()
with self.store.open_session() as session:
session.advanced.load_into_stream(["docs/1", "docs/2"], output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
names = [r["name"] for r in data["Results"]]
self.assertIn("alpha", names)
self.assertIn("beta", names)

def test_load_into_stream_single_document(self):
class Doc:
def __init__(self, name: str = None):
self.name = name

with self.store.open_session() as session:
session.store(Doc("gamma"), "docs/3")
session.save_changes()

output = io.BytesIO()
with self.store.open_session() as session:
session.advanced.load_into_stream(["docs/3"], output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
self.assertEqual(1, len(data["Results"]))
self.assertEqual("gamma", data["Results"][0]["name"])

def test_load_starting_with_into_stream_writes_to_bytesio(self):
class Doc:
def __init__(self, name: str = None):
self.name = name

with self.store.open_session() as session:
session.store(Doc("one"), "prefix/1")
session.store(Doc("two"), "prefix/2")
session.save_changes()

output = io.BytesIO()
with self.store.open_session() as session:
session.advanced.load_starting_with_into_stream("prefix/", output)

output.seek(0)
data = json.loads(output.read())
self.assertIn("Results", data)
self.assertEqual(2, len(data["Results"]))
expected_names = {"one", "two"}
for r in data["Results"]:
self.assertIn(r["name"], expected_names)
expected_names.discard(r["name"])
self.assertEqual(0, len(expected_names))


if __name__ == "__main__":
unittest.main()
Loading
Loading