Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion slips/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def get_analyzed_flows_percentage(self) -> str:
if not hasattr(self, "total_flows"):
self.total_flows = self.db.get_total_flows()

processed = self.db.get_flow_analyzed_by_the_profiler_so_far()
processed = self.db.get_flows_analyzed_by_the_profiler_so_far()
if not processed:
return ""
try:
Expand Down
5 changes: 4 additions & 1 deletion slips_files/core/database/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ def get_disabled_modules(self, *args, **kwargs):
def increment_profiler_workers_started(self, *args, **kwargs):
return self.rdb.increment_profiler_workers_started(*args, **kwargs)

def decrement_profiler_workers_started(self, *args, **kwargs):
return self.rdb.decrement_profiler_workers_started(*args, **kwargs)

def get_profiler_workers_started(self, *args, **kwargs):
return self.rdb.get_profiler_workers_started(*args, **kwargs)

Expand Down Expand Up @@ -1149,7 +1152,7 @@ def get_info_about_icmp_flows_using_sport(self, *args, **kwargs):
def increment_processed_flows(self, *args, **kwargs):
return self.rdb.increment_processed_flows(*args, **kwargs)

def get_flow_analyzed_by_the_profiler_so_far(self, *args, **kwargs):
def get_flows_analyzed_by_the_profiler_so_far(self, *args, **kwargs):
return self.rdb.get_flow_analyzed_by_the_profiler_so_far(
*args, **kwargs
)
Expand Down
3 changes: 3 additions & 0 deletions slips_files/core/database/redis_db/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,9 @@ def increment_profiler_workers_started(self) -> int:
"""increments the number of profiler workers started"""
return self.r.incr(self.constants.PROFILER_WORKERS_STARTED, 1)

def decrement_profiler_workers_started(self) -> int:
return self.r.incr(self.constants.PROFILER_WORKERS_STARTED, -1)

def get_profiler_workers_started(self) -> int:
"""returns number of profiler workers started so far"""
count = self.r.get(self.constants.PROFILER_WORKERS_STARTED)
Expand Down
140 changes: 4 additions & 136 deletions slips_files/core/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import queue
import multiprocessing
import time
import threading
from multiprocessing import Process
from multiprocessing.synchronize import Event, Semaphore
from typing import (
List,
Expand All @@ -42,7 +40,7 @@
from slips_files.core.input_profilers.nfdump import Nfdump
from slips_files.core.input_profilers.suricata import Suricata
from slips_files.core.input_profilers.zeek import ZeekJSON, ZeekTabs
from slips_files.core.profiler_worker import ProfilerWorker
from slips_files.core.worker_manager_mixin import WorkerManagerMixin

SUPPORTED_INPUT_TYPES = {
InputType.ZEEK: ZeekJSON,
Expand All @@ -62,7 +60,7 @@
}


class Profiler(ICore, IObservable):
class Profiler(WorkerManagerMixin, ICore, IObservable):
"""A class to create the profiles for IPs"""

name = "profiler"
Expand Down Expand Up @@ -107,16 +105,7 @@ def init(
# is set by input to indicate it stopped because of a failure
self.is_input_failed_event: Optional[Event] = is_input_failed_event
self.input_handler_obj = None
# to close them on shutdown
self.profiler_child_processes: List[Process] = []
# to access their internal attributes if needed
self.workers: List[ProfilerWorker] = []
# is set by this module to indicate to the monitor thread that
# workers stoppped.
self.did_all_workers_stop: Event = multiprocessing.Event()
self.last_worker_id = -1
# max parallel profiler workers to start when high throughput is detected
self.max_workers = 6
self.init_worker_manager()
# 30MBs max size of this queue to avoid growing forever in mem
self.aid_queue = multiprocessing.Queue(maxsize=30000000)
# This starts a process that handles calculatng aid hash and stores
Expand All @@ -126,13 +115,6 @@ def init(
self.db,
self.aid_queue,
)
now = time.monotonic()
self.next_throughput_check_time = now + 300
self.profiler_monitor_thread = threading.Thread(
target=self._run_profiler_monitor_loop,
name="profiler_monitor_loop",
daemon=True,
)

def subscribe_to_channels(self):
self.channels = {}
Expand Down Expand Up @@ -176,23 +158,6 @@ def get_input_type(self, line: dict, input_type: str) -> str:
# binetflow, binetflow tabs, nfdump, suricata
return input_type

def stop_profiler_workers(self):
"""
wait as long as needed foreach worker to stop
"""
# ensure we don't block forever waiting for workers that will never
# receive the stop sentinel
if self.is_input_done_event is not None:
self.is_input_done_event.wait()

for process in self.profiler_child_processes:
try:
process.join()
except (OSError, ChildProcessError):
pass

self.did_all_workers_stop.set()

def mark_self_as_done_processing(self) -> None:
"""
is called to mark this process as done processing so
Expand Down Expand Up @@ -223,30 +188,6 @@ def get_msg_from_queue(self, q: multiprocessing.Queue):
except Exception:
return None

def start_profiler_worker(self, worker_id: int = None):
"""starts A profiler worker for faster processing of the flows"""
worker_name = f"profiler_worker_process_{worker_id}"
worker = ProfilerWorker(
logger=self.logger,
output_dir=self.parent_output_dir,
redis_port=self.redis_port,
termination_event=self.termination_event,
conf=self.conf,
ppid=self.ppid,
slips_args=self.args,
bloom_filters_manager=self.bloom_filters,
# module specific kwargs
name=worker_name,
profiler_queue=self.profiler_queue,
input_handler=self.input_handler_obj,
aid_queue=self.aid_queue,
aid_manager=self.aid_manager,
is_input_done_event=self.is_input_done_event,
)
worker.start()
self.profiler_child_processes.append(worker)
self.db.increment_profiler_workers_started()

def get_handler_obj(
self, first_msg: dict
) -> ZeekTabs | ZeekJSON | Argus | Suricata | ZeekTabs | Nfdump:
Expand Down Expand Up @@ -307,70 +248,11 @@ def shutdown_gracefully(self):
self.mark_self_as_done_processing()
self.db.set_new_incoming_flows(False)

def did_5min_pass_since_last_throughput_check(self) -> bool:
"""
returns true if 5 mins passed since the last time we checked
the flows read per second
"""
now = time.monotonic()
if now < self.next_throughput_check_time:
return False

# Advance in 5-min steps to reduce drift on long delays.
while self.next_throughput_check_time <= now:
self.next_throughput_check_time += 300
return True

def max_workers_started(self) -> bool:
"""
returns true if the maximum number of profiler workers
is already started
"""
# bc workers start from 0
if self.last_worker_id + 1 >= self.max_workers:
return True
return False

def _check_if_high_throughput_and_add_workers(self):
"""
Checks for input and profile flows/sec imbalance and adds more
profiler workers if needed.
"""
if self.max_workers_started():
return

if not self.did_5min_pass_since_last_throughput_check():
return

profiler_fps = self.db.get_core_module_flows_per_second(self.name) or 0
input_fps = self.db.get_core_module_flows_per_second("Input") or 0
if float(input_fps) > (
float(profiler_fps) * 1.1
): # 10% more input fps than profiler fps
worker_id = self.last_worker_id + 1
self.start_profiler_worker(worker_id)
self.last_worker_id = worker_id
self.print(
f"Warning: High throughput detected. Started "
f"additional worker: "
f"profiler_worker_{worker_id} to handle the flows."
)

if self.last_worker_id == self.max_workers - 1:
self.print(
f"Maximum number of profiler workers "
f"({self.max_workers}) started."
)

def pre_main(self):
client_ips = [str(ip) for ip in self.client_ips]
if client_ips:
self.print(f"Used client IPs: {green(', '.join(client_ips))}")

def _update_lines_read_by_all_workers(self):
# needed by store_flows_read_per_second()
self.lines = sum([worker.received_lines for worker in self.workers])

def should_stop(self):
"""
overrides IModule.should_stop().
Expand All @@ -381,18 +263,6 @@ def should_stop(self):
"""
return self.stop_other_workers.is_set()

def _run_profiler_monitor_loop(self):
"""
Does necessary monitoring and stats updating for the profiler while
the workers are
running.
"""
while not self.did_all_workers_stop.is_set():
self._update_lines_read_by_all_workers()
# implemented in icore.py
self.store_flows_read_per_second()
self._check_if_high_throughput_and_add_workers()

def _is_input_done(self) -> bool:
return (
self.is_input_done_event is not None
Expand Down Expand Up @@ -443,7 +313,6 @@ def main(self):
self.db
)
else:

self.input_handler_obj = self.get_handler_obj(msg)
# put again that msg in queue to be processed by the profilers,
# we just checked it here to determine the input handler obj
Expand All @@ -461,8 +330,7 @@ def main(self):

# slips starts with these workers by default until it detects
# high throughput that these workers arent enough to handle
num_of_initial_profiler_workers = 3
for worker_id in range(num_of_initial_profiler_workers):
for worker_id in range(self.num_of_initial_profiler_workers):
self.last_worker_id = worker_id
self.start_profiler_worker(worker_id)

Expand Down
Loading
Loading