diff --git a/core/powershell/dlp.py b/core/powershell/dlp.py new file mode 100644 index 00000000..c146fcd1 --- /dev/null +++ b/core/powershell/dlp.py @@ -0,0 +1,39 @@ +import json +from core.powershell.client import PowerShellClient + +class DlpService: + def __init__(self, client: PowerShellClient): + self.client = client + + def fetch_dlp_policies(self) -> list: + """Locates certificate, triggers powershell execution, and parses DLP compliance policies.""" + try: + cert_path = self.client.locate_certificate() + except Exception as e: + raise RuntimeError(f"Failed to locate certificate for authentication: {str(e)}") + + args = [ + "-AppId", self.client.client_id, + "-Organization", self.client.tenant_id, + "-CertificatePath", cert_path + ] + if self.client.cert_password: + args += ["-CertificatePassword", self.client.cert_password] + + raw_output = self.client.execute_script("scripts/get_dlp_policies.ps1", args) + + if not raw_output or not raw_output.strip(): + return [] + + try: + return json.loads(raw_output) + except json.JSONDecodeError: + # Parse JSON block in case of warning/header lines outputted by powershell environment + lines = raw_output.strip().split('\n') + json_str = "" + for line in lines: + if line.startswith("[") or line.startswith("{") or json_str: + json_str += line + if json_str: + return json.loads(json_str) + raise RuntimeError(f"PowerShell returned non-JSON format: {raw_output}") diff --git a/core/powershell/scripts/get_dlp_policies.ps1 b/core/powershell/scripts/get_dlp_policies.ps1 new file mode 100644 index 00000000..3b8db247 --- /dev/null +++ b/core/powershell/scripts/get_dlp_policies.ps1 @@ -0,0 +1,115 @@ +param( + [Parameter(Mandatory=$true)] + [string]$AppId, + [Parameter(Mandatory=$true)] + [string]$Organization, + [Parameter(Mandatory=$true)] + [string]$CertificatePath, + [Parameter(Mandatory=$false)] + [string]$CertificatePassword +) + +$ErrorActionPreference = "Stop" + +# Check if ExchangeOnlineManagement module is installed beforehand +if (-not (Get-Module -ListAvailable -Name ExchangeOnlineManagement)) { + throw "ExchangeOnlineManagement PowerShell module is not installed. Please install it beforehand by running: Install-Module -Name ExchangeOnlineManagement -Scope CurrentUser" +} + +Import-Module ExchangeOnlineManagement + +# Connect to Security & Compliance PowerShell using App-Only Cert Auth +if ($CertificatePassword) { + $secPassword = ConvertTo-SecureString -String $CertificatePassword -AsPlainText -Force + Connect-IPPSSession -CertificateFilePath $CertificatePath -CertificatePassword $secPassword -AppId $AppId -Organization $Organization -ShowBanner:$false -WarningAction SilentlyContinue +} else { + Connect-IPPSSession -CertificateFilePath $CertificatePath -AppId $AppId -Organization $Organization -ShowBanner:$false -WarningAction SilentlyContinue +} + +try { + # Retrieve policies + $policies = Get-DlpCompliancePolicy + $output = @() + + if ($policies) { + $policies_list = @($policies) + + foreach ($policy in $policies_list) { + # Retrieve rules for this policy + $rules = Get-DlpComplianceRule -Policy $policy.Identity.ToString() + $rulesOutput = @() + + if ($rules) { + foreach ($rule in @($rules)) { + # Convert actions and conditions/exceptions to strings for robust inspection + $actionsList = @() + if ($rule.Actions) { + foreach ($action in $rule.Actions) { + $actionsList += $action.ToString() + } + } + + $ruleObj = [PSCustomObject]@{ + Name = $rule.Name + Comment = $rule.Comment + Enabled = $rule.Enabled + Actions = $actionsList + } + $rulesOutput += $ruleObj + } + } + + # Format locations (can be lists of specific objects or strings, let's convert to arrays/strings) + $exchangeLoc = $null + if ($policy.ExchangeLocation) { + $exchangeLoc = @($policy.ExchangeLocation | ForEach-Object { $_.ToString() }) + } + + $sharepointLoc = $null + if ($policy.SharePointLocation) { + $sharepointLoc = @($policy.SharePointLocation | ForEach-Object { $_.ToString() }) + } + + $onedriveLoc = $null + if ($policy.OneDriveLocation) { + $onedriveLoc = @($policy.OneDriveLocation | ForEach-Object { $_.ToString() }) + } + + $teamsLoc = $null + if ($policy.TeamsLocation) { + $teamsLoc = @($policy.TeamsLocation | ForEach-Object { $_.ToString() }) + } + + $devicesLoc = $null + if ($policy.DevicesLocation) { + $devicesLoc = @($policy.DevicesLocation | ForEach-Object { $_.ToString() }) + } + + $policyObj = [PSCustomObject]@{ + Name = $policy.Name + Comment = $policy.Comment + ExchangeLocation = $exchangeLoc + SharePointLocation = $sharepointLoc + OneDriveLocation = $onedriveLoc + TeamsLocation = $teamsLoc + DevicesLocation = $devicesLoc + Mode = $policy.Mode.ToString() + DistributionStatus = $policy.DistributionStatus.ToString() + Enabled = $policy.Enabled + Identity = $policy.Identity.ToString() + WhenCreated = $policy.WhenCreated + WhenChanged = $policy.WhenChanged + CreatedBy = $policy.CreatedBy + LastModifiedBy = $policy.LastModifiedBy + Rules = $rulesOutput + } + $output += $policyObj + } + $output | ConvertTo-Json -Depth 5 + } else { + "[]" + } +} +finally { + Disconnect-ExchangeOnline -Confirm:$false +} diff --git a/core/powershell/scripts/get_retention_policies.ps1 b/core/powershell/scripts/get_retention_policies.ps1 index b151b147..960c015e 100644 --- a/core/powershell/scripts/get_retention_policies.ps1 +++ b/core/powershell/scripts/get_retention_policies.ps1 @@ -59,7 +59,9 @@ try { $trigger = "N/A" $rule = $null - if ($ruleMap.ContainsKey($policy.Name)) { + if ($policy.Guid -and $ruleMap.ContainsKey($policy.Guid.ToString())) { + $rule = $ruleMap[$policy.Guid.ToString()] + } elseif ($ruleMap.ContainsKey($policy.Name)) { $rule = $ruleMap[$policy.Name] } elseif ($ruleMap.ContainsKey($policy.Identity.ToString())) { $rule = $ruleMap[$policy.Identity.ToString()] diff --git a/flet_app/dashboard.py b/flet_app/dashboard.py index f7756f4c..509d8d21 100644 --- a/flet_app/dashboard.py +++ b/flet_app/dashboard.py @@ -16,7 +16,7 @@ from telemetry.power_automate import PowerAutomateScanner from telemetry.mailbox_usage import run_mailbox_usage_pipeline from telemetry.sharepoint_onedrive_usage import run_sharepoint_pipeline, run_onedrive_pipeline -from telemetry.data_security_governance import run_security_governance_pipeline +from telemetry.data_security_governance import run_security_governance_pipeline, format_dlp_workloads, analyze_thick_client_enforcement class DashboardView(ft.Container): def __init__(self, tenant, client, secret, on_disconnect): @@ -33,6 +33,7 @@ def __init__(self, tenant, client, secret, on_disconnect): # Saved data for CSV exports self.last_licenses_items = [] self.last_complex_flows = [] + self.last_dlp_policies_data = [] # Saved data for Sensitivity Labels pagination self.flattened_labels = [] @@ -117,6 +118,24 @@ def __init__(self, tenant, client, secret, on_disconnect): ) self.retention_section = self.create_card("Retention Compliance Policies", action_control=self.purview_btn, on_retry=self.handle_retry_security_gov) + # 9b. DLP Compliance Policies Card (with Purview Link & Export Button) + self.dlp_purview_btn = ft.TextButton( + content=ft.Text("Open Microsoft Purview Portal ↗", color=COLOR_PRIMARY, weight=ft.FontWeight.BOLD), + on_click=lambda e: e.page.launch_url("https://purview.microsoft.com/datalossprevention/policies") + ) + self.export_dlp_btn = ft.IconButton( + icon=ft.Icons.DOWNLOAD, + icon_color=COLOR_PRIMARY, + tooltip="Export DLP Policies", + disabled=True, + on_click=self.handle_export_dlp + ) + self.dlp_section = self.create_card( + "DLP Compliance Policies", + action_control=ft.Row([self.dlp_purview_btn, self.export_dlp_btn], spacing=10), + on_retry=self.handle_retry_security_gov + ) + # 10. Power Automate Card (with Export Button) self.export_pa_btn = ft.IconButton( icon=ft.Icons.DOWNLOAD, @@ -138,6 +157,7 @@ def __init__(self, tenant, client, secret, on_disconnect): self.onedrive_section, self.labels_section, self.retention_section, + self.dlp_section, self.pa_section ], scroll=ft.ScrollMode.AUTO, @@ -274,6 +294,8 @@ def handle_retry_security_gov(self, e): self.labels_section.update() self.set_loading(self.retention_section, "Retrieving Retention policies...") self.retention_section.update() + self.set_loading(self.dlp_section, "Retrieving DLP policies...") + self.dlp_section.update() threading.Thread(target=self.fetch_security_gov, daemon=True).start() @@ -333,11 +355,13 @@ def handle_fetch(self, e): self.onedrive_section.update() threading.Thread(target=self.fetch_onedrive, daemon=True).start() - # 8 & 9. Sensitivity Labels & Retention Policies + # 8 & 9. Sensitivity Labels & Retention Policies & DLP Policies self.set_loading(self.labels_section, "Retrieving Sensitivity labels...") self.labels_section.update() self.set_loading(self.retention_section, "Retrieving Retention policies...") self.retention_section.update() + self.set_loading(self.dlp_section, "Retrieving DLP policies...") + self.dlp_section.update() threading.Thread(target=self.fetch_security_gov, daemon=True).start() # 10. Power Automate @@ -780,18 +804,79 @@ def fetch_security_gov(self): ) self.retention_section.content_container.content = ft.Column([table], scroll=ft.ScrollMode.AUTO, height=300) + # 3. Populate DLP Policies + dlp_policies = data.get("dlp_policies") + dlp_policies_error = data.get("dlp_policies_error") + self.last_dlp_policies_data = dlp_policies + + if dlp_policies_error: + msg = dlp_policies_error + if "is not installed or not in PATH" in dlp_policies_error.lower(): + msg = "PowerShell Core ('pwsh') is not installed or configured on this machine." + elif "exchangeonlinemanagement" in dlp_policies_error.lower(): + msg = "ExchangeOnlineManagement PowerShell module is missing." + elif "get-dlpcompliancypolicy" in dlp_policies_error.lower() and "not recognized" in dlp_policies_error.lower(): + msg = "DLP Compliance Management permissions required. Please assign the correct role in Microsoft Purview." + self.dlp_section.content_container.content = ft.Text(f"Error loading policies: {msg}", color=COLOR_ERROR) + self.export_dlp_btn.disabled = True + elif not dlp_policies: + self.dlp_section.content_container.content = ft.Text("No DLP Compliance Policies found.", color=COLOR_TEXT_SUB) + self.export_dlp_btn.disabled = True + else: + self.export_dlp_btn.disabled = False + policies_list = dlp_policies if isinstance(dlp_policies, list) else [dlp_policies] + rows = [] + for policy in policies_list: + workloads_str = format_dlp_workloads(policy) + thick_client_status = analyze_thick_client_enforcement(policy) + + enabled_val = policy.get("Enabled", True) + is_enabled = enabled_val.lower() == "true" if isinstance(enabled_val, str) else bool(enabled_val) + status_str = "🟢 Enabled" if is_enabled else "🔴 Disabled" + + rows.append(ft.DataRow(cells=[ + ft.DataCell(ft.Column([ + ft.Text(policy.get("Name", "N/A"), weight=ft.FontWeight.BOLD), + ft.Text(policy.get("Comment", ""), size=11, color=COLOR_TEXT_SUB) if policy.get("Comment") else ft.Container() + ], alignment=ft.MainAxisAlignment.CENTER)), + ft.DataCell(ft.Text(workloads_str)), + ft.DataCell(ft.Text(policy.get("Mode", "Enforce"))), + ft.DataCell(ft.Text(thick_client_status)), + ft.DataCell(ft.Text(status_str)) + ])) + + table = ft.DataTable( + columns=[ + ft.DataColumn(ft.Text("Policy Name", weight=ft.FontWeight.BOLD)), + ft.DataColumn(ft.Text("Workloads", weight=ft.FontWeight.BOLD)), + ft.DataColumn(ft.Text("Mode", weight=ft.FontWeight.BOLD)), + ft.DataColumn(ft.Text("Thick Client DLP Enforcement", weight=ft.FontWeight.BOLD)), + ft.DataColumn(ft.Text("Status", weight=ft.FontWeight.BOLD)), + ], + rows=rows, + border=ft.Border.all(1, COLOR_OUTLINE_LIGHT), + border_radius=8, + heading_row_color=COLOR_TONAL_BG, + ) + self.dlp_section.content_container.content = ft.Column([table], scroll=ft.ScrollMode.AUTO, height=300) + self.mark_complete("security_gov", "success") except Exception as e: self.set_error(self.labels_section, str(e)) self.set_error(self.retention_section, str(e)) + self.set_error(self.dlp_section, str(e)) self.labels_pagination_row.visible = False + self.export_dlp_btn.disabled = True self.mark_complete("security_gov", "error") finally: self.clear_loading(self.labels_section) self.clear_loading(self.retention_section) + self.clear_loading(self.dlp_section) if self.page: self.labels_section.update() self.retention_section.update() + self.dlp_section.update() + self.export_dlp_btn.update() def render_labels_page(self): total_items = len(self.flattened_labels) @@ -988,3 +1073,58 @@ def on_save_result(save_event: ft.FilePickerResultEvent): e.page.update() ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") picker.save_file(file_name=f"complex_flows_{ts}.csv") + + def handle_export_dlp(self, e): + def on_save_result(save_event: ft.FilePickerResultEvent): + if save_event.path: + try: + headers = [ + "Policy Name", "Identity", "Description / Comment", "Workloads", "Mode", + "Distribution Status", "Is Enabled", "Thick Client DLP Enforcement", + "Exchange Locations", "SharePoint Locations", "OneDrive Locations", + "Teams Locations", "Devices Locations", "Rules Configured" + ] + rows = [] + policies_list = self.last_dlp_policies_data if isinstance(self.last_dlp_policies_data, list) else [self.last_dlp_policies_data] + + for policy in policies_list: + workloads_str = format_dlp_workloads(policy) + thick_client_status = analyze_thick_client_enforcement(policy) + + # Format rules details + rules = policy.get("Rules", []) + rules_details = [] + for r in rules: + actions_str = ", ".join(r.get("Actions", [])) + rules_details.append(f"Rule: {r.get('Name')} (Enabled: {r.get('Enabled')}) - Actions: [{actions_str}]") + rules_str = "\n".join(rules_details) + + rows.append([ + policy.get("Name", "N/A"), + policy.get("Identity", "N/A"), + policy.get("Comment", "N/A"), + workloads_str, + policy.get("Mode", "N/A"), + policy.get("DistributionStatus", "N/A"), + str(policy.get("Enabled", True)), + thick_client_status, + ", ".join(policy.get("ExchangeLocation") or []), + ", ".join(policy.get("SharePointLocation") or []), + ", ".join(policy.get("OneDriveLocation") or []), + ", ".join(policy.get("TeamsLocation") or []), + ", ".join(policy.get("DevicesLocation") or []), + rules_str + ]) + + with open(save_event.path, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + writer.writerow(headers) + writer.writerows(rows) + except Exception as ex: + print(f"Failed to export DLP policies: {ex}") + + picker = ft.FilePicker(on_result=on_save_result) + e.page.overlay.append(picker) + e.page.update() + ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + picker.save_file(file_name=f"dlp_compliance_policies_{ts}.csv") diff --git a/telemetry/data_security_governance.py b/telemetry/data_security_governance.py index dc124d06..4ac566b0 100644 --- a/telemetry/data_security_governance.py +++ b/telemetry/data_security_governance.py @@ -29,6 +29,54 @@ # Import shared styles from telemetry.styles import * +def format_dlp_workloads(policy) -> str: + """Formats active workloads/locations for a DLP policy as a friendly string.""" + active = [] + if policy.get("ExchangeLocation"): + active.append("Exchange") + if policy.get("SharePointLocation"): + active.append("SharePoint") + if policy.get("OneDriveLocation"): + active.append("OneDrive") + if policy.get("TeamsLocation"): + active.append("Teams") + if policy.get("DevicesLocation"): + active.append("Devices (Endpoints)") + return ", ".join(active) if active else "None" + +def analyze_thick_client_enforcement(policy) -> str: + """Analyzes the rules of a DLP policy to determine thick client (desktop app) enforcement details.""" + devices = policy.get("DevicesLocation") + if not devices: + return "Not Configured (Devices workload inactive)" + + rules = policy.get("Rules", []) + if not rules: + return "Devices Workload Active (No rules configured)" + + has_restricted_apps = False + has_other_endpoint_restrictions = False + + for rule in rules: + if not rule.get("Enabled", True): + continue + actions = rule.get("Actions", []) + for act in actions: + act_str = str(act).lower() + if "restrictedapp" in act_str: + has_restricted_apps = True + elif any(keyword in act_str for keyword in ["clipboard", "print", "removablemedia", "usb", "networkshare", "cloudlookup", "bluetooth"]): + has_other_endpoint_restrictions = True + + if has_restricted_apps and has_other_endpoint_restrictions: + return "Purview Restricted Apps & Device Restrictions (Clipboard/USB/Print)" + elif has_restricted_apps: + return "Purview Restricted Apps" + elif has_other_endpoint_restrictions: + return "Purview Device Restrictions (Clipboard/USB/Print)" + else: + return "Devices Workload Active (Audit/Alert Only)" + def run_security_governance_pipeline(client_id, client_secret, tenant_id) -> dict: """Pipeline specifically for security and governance policy data collection.""" usage_logger.info("Starting Data Security & Governance Pipeline...") @@ -82,16 +130,37 @@ def run_security_governance_pipeline(client_id, client_secret, tenant_id) -> dic usage_logger.error("Failed to fetch retention policies via PowerShell", exc_info=True) policies_error = str(e) - # Raise ConnectionError only if BOTH failed - if labels_error and policies_error: - raise ConnectionError(f"Security governance fetch failed.\nLabels Error: {labels_error}\nPolicies Error: {policies_error}") + # Fetch DLP Policies via PowerShell client + dlp_policies = None + dlp_policies_error = None + try: + from core.powershell.client import PowerShellClient + from core.powershell.dlp import DlpService + + ps_client = PowerShellClient(tenant_id=tenant_domain, client_id=client_id, client_secret=client_secret) + dlp_service = DlpService(ps_client) + dlp_policies = dlp_service.fetch_dlp_policies() + except Exception as e: + usage_logger.error("Failed to fetch DLP compliance policies via PowerShell", exc_info=True) + dlp_policies_error = str(e) + + # Raise ConnectionError only if ALL failed + if labels_error and policies_error and dlp_policies_error: + raise ConnectionError( + f"Security governance fetch failed.\n" + f"Labels Error: {labels_error}\n" + f"Policies Error: {policies_error}\n" + f"DLP Error: {dlp_policies_error}" + ) usage_logger.info("Data Security & Governance Pipeline completed successfully.") return { "labels": labels, "labels_error": labels_error, "policies": policies, - "policies_error": policies_error + "policies_error": policies_error, + "dlp_policies": dlp_policies, + "dlp_policies_error": dlp_policies_error } @@ -110,6 +179,7 @@ def __init__(self, master, log_callback, credentials_callback, status_change_cal self.ITEMS_PER_PAGE = 8 self.last_labels_data = None self.last_policies_data = None + self.last_dlp_policies_data = None self.build_ui() @@ -237,6 +307,7 @@ def build_ui(self): state="disabled" ) self.btn_export_retention.pack(side="right", anchor="e") + self.retention_grid = ctk.CTkFrame( self.inner_pad, fg_color=COLOR_SURFACE, @@ -244,6 +315,53 @@ def build_ui(self): border_width=1, corner_radius=8 ) + + # DLP Compliance Policies section + self.dlp_header_frame = ctk.CTkFrame(self.inner_pad, fg_color="transparent") + self.dlp_title = ctk.CTkLabel( + self.dlp_header_frame, + text="DLP Compliance Policies", + font=FONT_HEADER_SMALL, + text_color=COLOR_TEXT_MAIN + ) + self.dlp_title.pack(side="left", anchor="w") + + self.dlp_link = ctk.CTkLabel( + self.dlp_header_frame, + text="Open Microsoft Purview Portal ↗", + font=FONT_BODY_BOLD, + text_color=COLOR_PRIMARY, + cursor="hand2" + ) + self.dlp_link.pack(side="left", anchor="w", padx=(15, 0)) + self.dlp_link.bind("", lambda e: webbrowser.open("https://purview.microsoft.com/datalossprevention/policies")) + self.dlp_link.bind("", lambda e: self.dlp_link.configure(text_color=COLOR_PRIMARY_HOVER)) + self.dlp_link.bind("", lambda e: self.dlp_link.configure(text_color=COLOR_PRIMARY)) + + self.btn_export_dlp = ctk.CTkButton( + self.dlp_header_frame, + text="Export DLP Policies", + font=FONT_BODY_BOLD, + fg_color="transparent", + text_color=COLOR_PRIMARY, + border_width=1, + border_color=COLOR_OUTLINE, + hover_color=COLOR_SECONDARY_HOVER, + width=180, + height=32, + corner_radius=16, + command=self.export_dlp_csv, + state="disabled" + ) + self.btn_export_dlp.pack(side="right", anchor="e") + + self.dlp_grid = ctk.CTkFrame( + self.inner_pad, + fg_color=COLOR_SURFACE, + border_color=COLOR_OUTLINE_LIGHT, + border_width=1, + corner_radius=8 + ) self.reset_view() @@ -256,6 +374,8 @@ def reset_view(self): self.labels_header_frame.pack_forget() self.retention_header_frame.pack_forget() self.retention_grid.pack_forget() + self.dlp_header_frame.pack_forget() + self.dlp_grid.pack_forget() for w in self.state_frame.winfo_children(): w.destroy() @@ -263,6 +383,8 @@ def reset_view(self): w.destroy() for w in self.retention_grid.winfo_children(): w.destroy() + for w in self.dlp_grid.winfo_children(): + w.destroy() self.flattened_rows = [] self.current_page = 0 @@ -271,6 +393,8 @@ def reset_view(self): self.btn_export_labels.configure(state="disabled") if hasattr(self, "btn_export_retention"): self.btn_export_retention.configure(state="disabled") + if hasattr(self, "btn_export_dlp"): + self.btn_export_dlp.configure(state="disabled") def _set_state_loading(self, msg="Loading..."): for w in self.state_frame.winfo_children(): @@ -306,11 +430,14 @@ def trigger_fetch(self, tenant, client_id, client_secret): self.labels_header_frame.pack_forget() self.retention_header_frame.pack_forget() self.retention_grid.pack_forget() + self.dlp_header_frame.pack_forget() + self.dlp_grid.pack_forget() self._set_state_loading("Retrieving tenant Security & Compliance policies...") self.btn_export_labels.configure(state="disabled") self.btn_export_retention.configure(state="disabled") + self.btn_export_dlp.configure(state="disabled") threading.Thread( target=self._execute_worker, @@ -334,14 +461,19 @@ def _render_success(self, data: dict): w.destroy() for w in self.retention_grid.winfo_children(): w.destroy() + for w in self.dlp_grid.winfo_children(): + w.destroy() labels = data.get("labels") labels_error = data.get("labels_error") policies = data.get("policies") policies_error = data.get("policies_error") + dlp_policies = data.get("dlp_policies") + dlp_policies_error = data.get("dlp_policies_error") self.last_labels_data = labels self.last_policies_data = policies + self.last_dlp_policies_data = dlp_policies usage_logger.info(f"Sensitivity Labels fetched successfully. Total labels to render: {len(labels) if labels else 0}") self.status = "success" @@ -418,6 +550,11 @@ def _render_success(self, data: dict): self.retention_grid.pack(fill="x", expand=True, pady=(0, 15)) self._render_retention_policies(policies, policies_error) + # 3. Render DLP Policies Grid + self.dlp_header_frame.pack(fill="x", pady=(20, 10)) + self.dlp_grid.pack(fill="x", expand=True, pady=(0, 15)) + self._render_dlp_policies(dlp_policies, dlp_policies_error) + self.on_status_change() def _display_current_page(self): @@ -528,15 +665,18 @@ def _render_error(self, err_msg): self.on_status_change() self.btn_export_labels.configure(state="disabled") self.btn_export_retention.configure(state="disabled") + self.btn_export_dlp.configure(state="disabled") def _render_retention_policies(self, policies, policies_error): if policies_error: msg = policies_error # Provide helpful, friendly advice if pwsh or dependency issue - if "powershell" in policies_error.lower() or "pwsh" in policies_error.lower(): + if "is not installed or not in PATH" in policies_error.lower(): msg = "PowerShell Core ('pwsh') is not installed or configured on this machine.\nPlease refer to the Prerequisites in the README to configure it." elif "exchangeonlinemanagement" in policies_error.lower(): msg = "ExchangeOnlineManagement PowerShell module is missing.\nPlease run: Install-Module -Name ExchangeOnlineManagement -Scope CurrentUser" + elif "get-retentioncompliancypolicy" in policies_error.lower() and "not recognized" in policies_error.lower(): + msg = "Retention Management permissions required.\nPlease ensure the authenticated account is assigned the 'Retention Management' or 'Compliance Administrator' role in Microsoft Purview." ctk.CTkLabel( self.retention_grid, @@ -656,6 +796,103 @@ def _render_retention_policies(self, policies, policies_error): c5.grid(row=r_idx, column=5, sticky="nsew", padx=1, pady=1) ctk.CTkLabel(c5, text=status, font=FONT_BODY_BOLD, text_color=COLOR_TEXT_MAIN).pack(padx=10, pady=6, anchor="w") + def _render_dlp_policies(self, policies, policies_error): + if policies_error: + msg = policies_error + # Provide helpful, friendly advice if pwsh or dependency issue + if "is not installed or not in PATH" in policies_error.lower(): + msg = "PowerShell Core ('pwsh') is not installed or configured on this machine.\nPlease refer to the Prerequisites in the README to configure it." + elif "exchangeonlinemanagement" in policies_error.lower(): + msg = "ExchangeOnlineManagement PowerShell module is missing.\nPlease run: Install-Module -Name ExchangeOnlineManagement -Scope CurrentUser" + elif "get-dlpcompliancypolicy" in policies_error.lower() and "not recognized" in policies_error.lower(): + msg = "DLP Compliance Management permissions required.\nPlease ensure the authenticated account is assigned the 'DLP Compliance Management' or 'Compliance Administrator' role in Microsoft Purview." + + ctk.CTkLabel( + self.dlp_grid, + text=f"✖ {msg}", + font=FONT_BODY_MEDIUM, + text_color=COLOR_ERROR, + justify="center" + ).pack(padx=20, pady=20) + self.btn_export_dlp.configure(state="disabled") + elif policies is None or not policies: + ctk.CTkLabel( + self.dlp_grid, + text="No DLP Compliance Policies found in this tenant.", + font=FONT_BODY_MEDIUM, + text_color=COLOR_TEXT_SUB + ).pack(padx=20, pady=20) + self.btn_export_dlp.configure(state="disabled") + else: + self.btn_export_dlp.configure(state="normal") + # Configure grid columns + self.dlp_grid.grid_columnconfigure(0, weight=3) # Policy Name + self.dlp_grid.grid_columnconfigure(1, weight=3) # Workloads + self.dlp_grid.grid_columnconfigure(2, weight=1) # Mode + self.dlp_grid.grid_columnconfigure(3, weight=4) # Thick Client Enforcement + self.dlp_grid.grid_columnconfigure(4, weight=1) # Status + + headers = ["Policy Name", "Workloads", "Mode", "Thick Client DLP Enforcement", "Status"] + for col_idx, head_text in enumerate(headers): + cell = ctk.CTkFrame(self.dlp_grid, fg_color=COLOR_TONAL_BG, corner_radius=0) + cell.grid(row=0, column=col_idx, sticky="nsew", padx=1, pady=1) + ctk.CTkLabel(cell, text=head_text, font=FONT_BODY_BOLD, text_color=COLOR_TONAL_TEXT).pack(padx=10, pady=8, anchor="w") + + # Handle case where policies is a single dict rather than a list + policies_list = policies if isinstance(policies, list) else [policies] + + for r_idx, policy in enumerate(policies_list, start=1): + bg_style = "transparent" if r_idx % 2 != 0 else COLOR_SURFACE_VARIANT + + name = policy.get("Name", "N/A") + comment = policy.get("Comment", "") + workloads_str = format_dlp_workloads(policy) + mode = policy.get("Mode", "Enforce") + thick_client_status = analyze_thick_client_enforcement(policy) + + # Enabled can be boolean or string + enabled_val = policy.get("Enabled", True) + if isinstance(enabled_val, str): + is_enabled = enabled_val.lower() == "true" + else: + is_enabled = bool(enabled_val) + + status = "🟢 Enabled" if is_enabled else "🔴 Disabled" + + c0 = ctk.CTkFrame(self.dlp_grid, fg_color=bg_style, corner_radius=0) + c0.grid(row=r_idx, column=0, sticky="nsew", padx=1, pady=1) + + has_comment = bool(comment and comment != name) + lbl_name = ctk.CTkLabel(c0, text=name, font=FONT_BODY_BOLD, text_color=COLOR_TEXT_MAIN) + lbl_name.pack(padx=10, pady=(6, 2) if has_comment else 6, anchor="w") + + if has_comment: + lbl_comment = ctk.CTkLabel(c0, text=comment, font=FONT_BODY_SMALL, text_color=COLOR_TEXT_SUB) + lbl_comment.pack(padx=10, pady=(0, 6), anchor="w") + c0.bind("", lambda e, l1=lbl_name, l2=lbl_comment: (l1.configure(wraplength=e.width - 20), l2.configure(wraplength=e.width - 20))) + else: + c0.bind("", lambda e, l=lbl_name: l.configure(wraplength=e.width - 20)) + + c1 = ctk.CTkFrame(self.dlp_grid, fg_color=bg_style, corner_radius=0) + c1.grid(row=r_idx, column=1, sticky="nsew", padx=1, pady=1) + lbl_workload = ctk.CTkLabel(c1, text=workloads_str, font=FONT_BODY_MEDIUM, text_color=COLOR_TEXT_MAIN) + lbl_workload.pack(padx=10, pady=6, anchor="w") + c1.bind("", lambda e, l=lbl_workload: l.configure(wraplength=e.width - 20)) + + c2 = ctk.CTkFrame(self.dlp_grid, fg_color=bg_style, corner_radius=0) + c2.grid(row=r_idx, column=2, sticky="nsew", padx=1, pady=1) + ctk.CTkLabel(c2, text=mode, font=FONT_BODY_MEDIUM, text_color=COLOR_TEXT_MAIN).pack(padx=10, pady=6, anchor="w") + + c3 = ctk.CTkFrame(self.dlp_grid, fg_color=bg_style, corner_radius=0) + c3.grid(row=r_idx, column=3, sticky="nsew", padx=1, pady=1) + lbl_thick = ctk.CTkLabel(c3, text=thick_client_status, font=FONT_BODY_MEDIUM, text_color=COLOR_TEXT_MAIN, justify="left") + lbl_thick.pack(padx=10, pady=6, anchor="w") + c3.bind("", lambda e, l=lbl_thick: l.configure(wraplength=e.width - 20)) + + c4 = ctk.CTkFrame(self.dlp_grid, fg_color=bg_style, corner_radius=0) + c4.grid(row=r_idx, column=4, sticky="nsew", padx=1, pady=1) + ctk.CTkLabel(c4, text=status, font=FONT_BODY_BOLD, text_color=COLOR_TEXT_MAIN).pack(padx=10, pady=6, anchor="w") + def export_labels_csv(self): """Prompts the user to save sensitivity labels as a detailed CSV file.""" if not hasattr(self, "last_labels_data") or not self.last_labels_data: @@ -779,4 +1016,66 @@ def export_retention_csv(self): except Exception as e: messagebox.showerror("Export Failed", f"Failed to export CSV: {e}", parent=self) - + def export_dlp_csv(self): + """Prompts the user to save DLP compliance policies as a detailed CSV file.""" + if not hasattr(self, "last_dlp_policies_data") or not self.last_dlp_policies_data: + from tkinter import messagebox + messagebox.showinfo("No Data", "There is no DLP policies data to export. Please run a scan first.", parent=self) + return + + from tkinter import filedialog, messagebox + from datetime import datetime + import pandas as pd + + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + f = filedialog.asksaveasfilename( + initialfile=f"dlp_compliance_policies_{ts}.csv", + defaultextension=".csv", + filetypes=[("CSV Files", "*.csv"), ("All Files", "*.*")], + parent=self + ) + if not f: + return + + policies_list = self.last_dlp_policies_data if isinstance(self.last_dlp_policies_data, list) else [self.last_dlp_policies_data] + + rows = [] + for policy in policies_list: + workloads_str = format_dlp_workloads(policy) + thick_client_status = analyze_thick_client_enforcement(policy) + + # Format rules details + rules = policy.get("Rules", []) + rules_details = [] + for r in rules: + actions_str = ", ".join(r.get("Actions", [])) + rules_details.append(f"Rule: {r.get('Name')} (Enabled: {r.get('Enabled')}) - Actions: [{actions_str}]") + rules_str = "\n".join(rules_details) + + rows.append({ + "Policy Name": policy.get("Name", "N/A"), + "Identity": policy.get("Identity", "N/A"), + "Description / Comment": policy.get("Comment", "N/A"), + "Workloads": workloads_str, + "Mode": policy.get("Mode", "N/A"), + "Distribution Status": policy.get("DistributionStatus", "N/A"), + "Is Enabled": policy.get("Enabled", True), + "Thick Client DLP Enforcement": thick_client_status, + "Exchange Locations": ", ".join(policy.get("ExchangeLocation") or []), + "SharePoint Locations": ", ".join(policy.get("SharePointLocation") or []), + "OneDrive Locations": ", ".join(policy.get("OneDriveLocation") or []), + "Teams Locations": ", ".join(policy.get("TeamsLocation") or []), + "Devices Locations": ", ".join(policy.get("DevicesLocation") or []), + "Rules Configured": rules_str, + "When Created": policy.get("WhenCreated", "N/A"), + "When Changed": policy.get("WhenChanged", "N/A"), + "Created By": policy.get("CreatedBy", "N/A"), + "Last Modified By": policy.get("LastModifiedBy", "N/A") + }) + + df = pd.DataFrame(rows) + try: + df.to_csv(f, index=False) + messagebox.showinfo("Export Successful", f"DLP compliance policies exported successfully to:\n{f}", parent=self) + except Exception as e: + messagebox.showerror("Export Failed", f"Failed to export CSV: {e}", parent=self)