-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-5779 Coverage increase for compression_support.py
#2770
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
aclark4life
wants to merge
5
commits into
mongodb:master
Choose a base branch
from
aclark4life:PYTHON-5779
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+238
−0
Draft
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
19d6905
PYTHON-5779 Increase code coverage for compression_support.py from 64…
aclark4life 10598e3
PYTHON-5779 Use 'from test import unittest' to match test suite conve…
aclark4life 0721490
PYTHON-5779 Fix environment-dependent snappy test, strengthen multipl…
aclark4life 9b482d5
Noah feedback
aclark4life a0af3a6
Copilot feedback
aclark4life File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,238 @@ | ||
| # Copyright 2026-present MongoDB, Inc. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Unit tests for compression_support.py.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import sys | ||
| from unittest.mock import patch | ||
|
|
||
| sys.path[0:0] = [""] | ||
|
|
||
| from test import unittest | ||
|
|
||
| from pymongo.compression_support import ( | ||
| CompressionSettings, | ||
| SnappyContext, | ||
| ZlibContext, | ||
| ZstdContext, | ||
| _have_snappy, | ||
| _have_zlib, | ||
| _have_zstd, | ||
| decompress, | ||
| validate_compressors, | ||
| validate_zlib_compression_level, | ||
| ) | ||
|
|
||
|
|
||
| class TestValidateCompressors(unittest.TestCase): | ||
| def test_string_input_single(self): | ||
| with patch("pymongo.compression_support._have_zlib", return_value=True): | ||
| result = validate_compressors(None, "zlib") | ||
| self.assertEqual(result, ["zlib"]) | ||
|
|
||
| def test_string_input_comma_separated(self): | ||
| with patch("pymongo.compression_support._have_zlib", return_value=True), patch( | ||
| "pymongo.compression_support._have_snappy", return_value=True | ||
| ): | ||
| result = validate_compressors(None, "zlib,snappy") | ||
| self.assertEqual(result, ["zlib", "snappy"]) | ||
|
|
||
| def test_iterable_input(self): | ||
| with patch("pymongo.compression_support._have_zlib", return_value=True): | ||
| result = validate_compressors(None, ["zlib"]) | ||
| self.assertEqual(result, ["zlib"]) | ||
|
aclark4life marked this conversation as resolved.
|
||
|
|
||
| def test_unsupported_compressor_warns_and_removes(self): | ||
| with self.assertWarns(UserWarning) as ctx: | ||
| result = validate_compressors(None, ["bogus"]) | ||
| self.assertEqual(result, []) | ||
| self.assertIn("Unsupported compressor: bogus", str(ctx.warning)) | ||
|
|
||
| def test_snappy_unavailable_warns_and_removes(self): | ||
| with patch("pymongo.compression_support._have_snappy", return_value=False): | ||
| with self.assertWarns(UserWarning) as ctx: | ||
| result = validate_compressors(None, ["snappy"]) | ||
| self.assertEqual(result, []) | ||
| self.assertIn("python-snappy", str(ctx.warning)) | ||
|
|
||
| def test_zlib_unavailable_warns_and_removes(self): | ||
| with patch("pymongo.compression_support._have_zlib", return_value=False): | ||
| with self.assertWarns(UserWarning) as ctx: | ||
| result = validate_compressors(None, ["zlib"]) | ||
| self.assertEqual(result, []) | ||
| self.assertIn("zlib", str(ctx.warning)) | ||
|
|
||
| def test_zstd_unavailable_warns_and_removes_pre_314(self): | ||
| if sys.version_info >= (3, 14): | ||
| self.skipTest("Python 3.14+ uses different warning message") | ||
| with patch("pymongo.compression_support._have_zstd", return_value=False): | ||
| with self.assertWarns(UserWarning) as ctx: | ||
| result = validate_compressors(None, ["zstd"]) | ||
| self.assertEqual(result, []) | ||
| self.assertIn("backports.zstd", str(ctx.warning)) | ||
|
|
||
| def test_zstd_unavailable_warns_and_removes_314_plus(self): | ||
| if sys.version_info < (3, 14): | ||
| self.skipTest("Only applies to Python 3.14+") | ||
| with patch("pymongo.compression_support._have_zstd", return_value=False): | ||
| with self.assertWarns(UserWarning) as ctx: | ||
| result = validate_compressors(None, ["zstd"]) | ||
| self.assertEqual(result, []) | ||
| self.assertIn("compression.zstd", str(ctx.warning)) | ||
|
|
||
| def test_multiple_valid_compressors_preserves_order(self): | ||
| with patch("pymongo.compression_support._have_zlib", return_value=True), patch( | ||
| "pymongo.compression_support._have_snappy", return_value=True | ||
| ): | ||
| result = validate_compressors(None, ["zlib", "snappy"]) | ||
| self.assertEqual(result, ["zlib", "snappy"]) | ||
|
|
||
| def test_empty_list_returns_empty(self): | ||
| result = validate_compressors(None, []) | ||
| self.assertEqual(result, []) | ||
|
|
||
|
|
||
| class TestValidateZlibCompressionLevel(unittest.TestCase): | ||
| def test_valid_minimum(self): | ||
| self.assertEqual(validate_zlib_compression_level("level", -1), -1) | ||
|
|
||
| def test_valid_maximum(self): | ||
| self.assertEqual(validate_zlib_compression_level("level", 9), 9) | ||
|
|
||
| def test_non_integer_raises_type_error(self): | ||
| with self.assertRaises(TypeError) as ctx: | ||
| validate_zlib_compression_level("level", "abc") | ||
| self.assertIn("must be an integer", str(ctx.exception)) | ||
|
|
||
| def test_too_low_raises_value_error(self): | ||
| with self.assertRaises(ValueError) as ctx: | ||
| validate_zlib_compression_level("level", -2) | ||
| self.assertIn("must be between -1 and 9", str(ctx.exception)) | ||
|
|
||
| def test_too_high_raises_value_error(self): | ||
| with self.assertRaises(ValueError) as ctx: | ||
| validate_zlib_compression_level("level", 10) | ||
| self.assertIn("must be between -1 and 9", str(ctx.exception)) | ||
|
|
||
| def test_string_integer_is_coerced(self): | ||
| self.assertEqual(validate_zlib_compression_level("level", "5"), 5) | ||
|
|
||
|
|
||
| class TestCompressionSettings(unittest.TestCase): | ||
| def _make(self, compressors=None, level=-1): | ||
| return CompressionSettings(compressors or [], level) | ||
|
|
||
| def test_get_context_none_when_empty(self): | ||
| settings = self._make() | ||
| self.assertIsNone(settings.get_compression_context([])) | ||
|
|
||
| def test_get_context_none_when_none(self): | ||
| settings = self._make() | ||
| self.assertIsNone(settings.get_compression_context(None)) | ||
|
|
||
| def test_get_context_snappy(self): | ||
| settings = self._make() | ||
| ctx = settings.get_compression_context(["snappy"]) | ||
| self.assertIsInstance(ctx, SnappyContext) | ||
|
|
||
| def test_get_context_zlib(self): | ||
| settings = self._make() | ||
| ctx = settings.get_compression_context(["zlib"]) | ||
| self.assertIsInstance(ctx, ZlibContext) | ||
|
|
||
| def test_get_context_zstd(self): | ||
| settings = self._make() | ||
| ctx = settings.get_compression_context(["zstd"]) | ||
| self.assertIsInstance(ctx, ZstdContext) | ||
|
|
||
| def test_get_context_uses_first_compressor(self): | ||
| settings = self._make() | ||
| ctx = settings.get_compression_context(["zlib", "snappy"]) | ||
| self.assertIsInstance(ctx, ZlibContext) | ||
|
|
||
| def test_get_context_unknown_returns_none(self): | ||
| settings = self._make() | ||
| ctx = settings.get_compression_context(["unknown"]) | ||
| self.assertIsNone(ctx) | ||
|
|
||
|
|
||
| class TestZlibContext(unittest.TestCase): | ||
| def setUp(self): | ||
| if not _have_zlib(): | ||
| self.skipTest("zlib not available") | ||
|
|
||
| def test_compress_and_decompress_roundtrip(self): | ||
| import zlib | ||
|
|
||
| ctx = ZlibContext(level=-1) | ||
| data = b"hello world" * 100 | ||
| compressed = ctx.compress(data) | ||
| self.assertEqual(zlib.decompress(compressed), data) | ||
|
|
||
|
|
||
| class TestDecompress(unittest.TestCase): | ||
| def test_unknown_compressor_id_raises(self): | ||
| with self.assertRaises(ValueError) as ctx: | ||
| decompress(b"data", 99) | ||
| self.assertIn("Unknown compressorId 99", str(ctx.exception)) | ||
|
|
||
| def test_zlib_roundtrip(self): | ||
| if not _have_zlib(): | ||
| self.skipTest("zlib not available") | ||
| import zlib | ||
|
|
||
| data = b"hello world" | ||
| compressed = zlib.compress(data) | ||
| result = decompress(compressed, ZlibContext.compressor_id) | ||
| self.assertEqual(result, data) | ||
|
|
||
| def test_zlib_with_memoryview(self): | ||
| if not _have_zlib(): | ||
| self.skipTest("zlib not available") | ||
| import zlib | ||
|
|
||
| data = b"test data" | ||
| compressed = zlib.compress(data) | ||
| result = decompress(memoryview(compressed), ZlibContext.compressor_id) | ||
| self.assertEqual(result, data) | ||
|
aclark4life marked this conversation as resolved.
aclark4life marked this conversation as resolved.
|
||
|
|
||
| def test_snappy_roundtrip(self): | ||
| if not _have_snappy(): | ||
| self.skipTest("python-snappy not installed") | ||
| data = b"hello world" * 50 | ||
| compressed = SnappyContext.compress(data) | ||
| result = decompress(compressed, SnappyContext.compressor_id) | ||
| self.assertEqual(result, data) | ||
|
Comment on lines
+217
to
+218
|
||
|
|
||
| def test_snappy_with_memoryview(self): | ||
| if not _have_snappy(): | ||
| self.skipTest("python-snappy not installed") | ||
| data = b"hello world" * 50 | ||
| compressed = SnappyContext.compress(data) | ||
| result = decompress(memoryview(compressed), SnappyContext.compressor_id) | ||
| self.assertEqual(result, data) | ||
|
|
||
| def test_zstd_roundtrip(self): | ||
| if not _have_zstd(): | ||
| self.skipTest("zstd not available") | ||
| data = b"hello world" * 50 | ||
| compressed = ZstdContext.compress(data) | ||
| result = decompress(compressed, ZstdContext.compressor_id) | ||
| self.assertEqual(result, data) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.