|
| 1 | +from io import BytesIO |
1 | 2 | import json |
2 | 3 | from datetime import date |
| 4 | +import zipfile |
3 | 5 |
|
4 | 6 | from django.urls import reverse, reverse_lazy |
5 | 7 |
|
6 | 8 | import requests_mock |
7 | 9 | from django_webtest import WebTest |
8 | 10 | from freezegun import freeze_time |
9 | 11 | from maykin_2fa.test import disable_admin_mfa |
| 12 | +from webtest import Upload |
10 | 13 |
|
11 | 14 | from objects.accounts.tests.factories import SuperUserFactory |
12 | 15 | from objects.core.constants import ( |
13 | 16 | DataClassificationChoices, |
14 | 17 | ObjectTypeVersionStatus, |
15 | 18 | UpdateFrequencyChoices, |
16 | 19 | ) |
| 20 | +from objects.core.import_export import export_data |
17 | 21 | from objects.core.models import ObjectType |
18 | 22 | from objects.core.tests.factories import ObjectTypeFactory, ObjectTypeVersionFactory |
19 | 23 |
|
|
35 | 39 | class AdminAddTests(WebTest): |
36 | 40 | url = reverse_lazy("admin:core_objecttype_add") |
37 | 41 | import_from_url = reverse_lazy("admin:import_from_url") |
| 42 | + import_from_file = reverse_lazy("admin:import_from_file") |
38 | 43 |
|
39 | 44 | @classmethod |
40 | 45 | def setUpTestData(cls): |
@@ -248,6 +253,123 @@ def test_create_objecttype_from_url_with_nonexistent_url(self): |
248 | 253 | self.assertEqual(response.status_code, 200) |
249 | 254 | self.assertEqual(ObjectType.objects.count(), 0) |
250 | 255 |
|
| 256 | + def test_create_import_from_file_requires_authentication(self): |
| 257 | + self.app.set_user(None) |
| 258 | + import_view = self.app.get(self.import_from_file, expect_errors=True) |
| 259 | + self.assertEqual(import_view.status_code, 302) |
| 260 | + self.assertIn("/admin/login", import_view.location) |
| 261 | + |
| 262 | + def test_create_import_from_file(self): |
| 263 | + # setup an export file for import |
| 264 | + objecttype = ObjectTypeFactory.create() |
| 265 | + output = BytesIO() |
| 266 | + export_data(output, objecttypes=[objecttype]) |
| 267 | + export_file_content = output.getvalue() |
| 268 | + |
| 269 | + # submit the import form with the export file |
| 270 | + import_view = self.app.get(self.import_from_file) |
| 271 | + form = import_view.form |
| 272 | + form["export_file"] = Upload( |
| 273 | + "test-export.zip", export_file_content, "application/zip" |
| 274 | + ) |
| 275 | + response = form.submit() |
| 276 | + |
| 277 | + # verify redirection after successful import |
| 278 | + self.assertEqual(response.status_code, 302) |
| 279 | + self.assertEqual(response.location, "/admin/core/objecttype/") |
| 280 | + |
| 281 | + # check that the objecttype was imported |
| 282 | + self.assertEqual(ObjectType.objects.count(), 2) |
| 283 | + |
| 284 | + def test_create_import_from_file_invalid_zip(self): |
| 285 | + objecttype = ObjectTypeFactory.create() |
| 286 | + output = BytesIO() |
| 287 | + export_data(output, objecttypes=[objecttype]) |
| 288 | + # corrupt the zip content by truncating it |
| 289 | + export_file_content = output.getvalue()[:10] |
| 290 | + |
| 291 | + import_view = self.app.get(self.import_from_file) |
| 292 | + form = import_view.form |
| 293 | + form["export_file"] = Upload( |
| 294 | + "test-export.zip", export_file_content, "application/zip" |
| 295 | + ) |
| 296 | + response = form.submit() |
| 297 | + |
| 298 | + self.assertEqual(response.status_code, 200) |
| 299 | + self.assertContains(response, "is not a supported file type") |
| 300 | + |
| 301 | + def test_create_import_from_file_empty(self): |
| 302 | + import_view = self.app.get(self.import_from_file) |
| 303 | + form = import_view.form |
| 304 | + # create an empty zip file |
| 305 | + empty_zip = BytesIO() |
| 306 | + with zipfile.ZipFile(empty_zip, "w") as zf: |
| 307 | + zf.writestr("README.md", b"This is the wrong file") |
| 308 | + empty_zip.seek(0) |
| 309 | + |
| 310 | + form["export_file"] = Upload("empty.zip", empty_zip.read(), "application/zip") |
| 311 | + response = form.submit() |
| 312 | + |
| 313 | + self.assertEqual(response.status_code, 200) |
| 314 | + self.assertContains(response, "Found nothing importable in that file") |
| 315 | + |
| 316 | + def test_create_import_from_file_validation_error(self): |
| 317 | + objecttype = ObjectTypeFactory.create() |
| 318 | + |
| 319 | + # invalidate some property |
| 320 | + objecttype.update_frequency = 1024 * "x" |
| 321 | + |
| 322 | + output = BytesIO() |
| 323 | + export_data(output, objecttypes=[objecttype]) |
| 324 | + export_file_content = output.getvalue() |
| 325 | + |
| 326 | + import_view = self.app.get(self.import_from_file) |
| 327 | + form = import_view.form |
| 328 | + form["export_file"] = Upload( |
| 329 | + "corrupted-export.zip", export_file_content, "application/zip" |
| 330 | + ) |
| 331 | + response = form.submit() |
| 332 | + |
| 333 | + self.assertEqual(response.status_code, 200) |
| 334 | + self.assertContains(response, "Row") |
| 335 | + # this error is there if no other errors were found, but nothing was |
| 336 | + # imported, because some random zip was uploaded |
| 337 | + self.assertNotContains(response, "Found nothing importable in that file") |
| 338 | + |
| 339 | + def test_create_import_from_file_no_file_selected(self): |
| 340 | + import_view = self.app.get(self.import_from_file) |
| 341 | + form = import_view.form |
| 342 | + response = form.submit() |
| 343 | + |
| 344 | + self.assertEqual(response.status_code, 200) |
| 345 | + self.assertContains(response, "This field is required.") |
| 346 | + |
| 347 | + def test_export_action(self): |
| 348 | + objecttype1 = ObjectTypeFactory.create(name="Tree 1") |
| 349 | + objecttype2 = ObjectTypeFactory.create(name="Tree 2") |
| 350 | + |
| 351 | + response = self.app.get(reverse("admin:core_objecttype_changelist")) |
| 352 | + form = response.forms["changelist-form"] |
| 353 | + form["action"] = "export_objecttypes_action" |
| 354 | + form["_selected_action"] = [objecttype1.pk, objecttype2.pk] |
| 355 | + response = form.submit() |
| 356 | + |
| 357 | + self.assertEqual(response.status_code, 200) |
| 358 | + self.assertEqual(response["Content-Type"], "application/zip") |
| 359 | + self.assertIn( |
| 360 | + 'attachment; filename="objecttypes-export.zip"', |
| 361 | + response["Content-Disposition"], |
| 362 | + ) |
| 363 | + |
| 364 | + # verify the exported content |
| 365 | + output = BytesIO(response.content) |
| 366 | + with zipfile.ZipFile(output, "r") as zf: |
| 367 | + self.assertIn("objectTypes.json", zf.namelist()) |
| 368 | + |
| 369 | + with zf.open("objectTypes.json") as f: |
| 370 | + data = json.load(f) |
| 371 | + self.assertEqual({d["name"] for d in data}, {"Tree 1", "Tree 2"}) |
| 372 | + |
251 | 373 |
|
252 | 374 | @disable_admin_mfa() |
253 | 375 | class AdminDetailTests(WebTest): |
|
0 commit comments