@@ -141,40 +141,29 @@ async def shutdown(self):
141141
142142 @_background_task (interval = 10 , lock = "hub_stats" , lock_expire = 10 )
143143 async def _fetch_hub_stats (self ):
144- print (f"[_fetch_hub_stats] Fetching from { STATS_URL } " )
145144 try :
146145 async with self ._session .get (STATS_URL ) as r :
147- print (f"[_fetch_hub_stats] Status: { r .status } " )
148146 if r .status == 200 :
149147 data = await r .json ()
150- aircraft_count = data .get ("aircraft_with_pos" )
151- print (f"[_fetch_hub_stats] aircraft_with_pos: { aircraft_count } " )
152- await self .redis .set (REDIS_KEY_HUB_AIRCRAFT , aircraft_count , ex = 15 )
153- print (f"[_fetch_hub_stats] Set Redis key { REDIS_KEY_HUB_AIRCRAFT } " )
148+ await self .redis .set (REDIS_KEY_HUB_AIRCRAFT , data .get ("aircraft_with_pos" ), ex = 15 )
154149 except Exception as e :
155150 print (f"[_fetch_hub_stats] Error: { e } " )
156151 traceback .print_exc ()
157152
158153 @_background_task (interval = 5 , lock = "ingest" , lock_expire = 8 )
159154 async def _fetch_ingest (self ):
160- print (f"[_fetch_ingest] Resolving { INGEST_DNS } " )
161155 try :
162- print (f"[_fetch_ingest] resolver={ self .resolver } , session={ self ._session } " )
163156 ips = [x .host for x in await self .resolver .query (INGEST_DNS , "A" )]
164- print (f"[_fetch_ingest] Resolved IPs: { ips } , fetching from { len (ips )} servers" )
165157 results = await asyncio .gather (* (self ._fetch_one (ip ) for ip in ips ))
166- print (f"[_fetch_ingest] Gather results: { results } " )
167158
168159 clients , receivers = [], []
169160 for r in results :
170161 if r :
171162 clients .extend (r .get ("clients" , []))
172163 receivers .extend (r .get ("receivers" , []))
173164
174- print (f"[_fetch_ingest] Got { len (clients )} clients, { len (receivers )} receivers" )
175165 await self .redis .set (REDIS_KEY_BEAST_CLIENTS , orjson .dumps (self ._dedupe (clients )), ex = 15 )
176166 await self .redis .set (REDIS_KEY_BEAST_RECEIVERS , orjson .dumps (receivers ), ex = 15 )
177- print (f"[_fetch_ingest] Set Redis keys" )
178167 except Exception as e :
179168 print (f"[_fetch_ingest] Error: { e } " )
180169 traceback .print_exc ()
@@ -203,7 +192,6 @@ async def get_receivers():
203192 await asyncio .gather (get_clients (), get_receivers ())
204193 for r in receivers or []:
205194 r [8 ], r [9 ] = round (r [8 ], 1 ), round (r [9 ], 1 )
206- print (f"[_fetch_one { ip } ] Got { len (clients or [])} clients, { len (receivers or [])} receivers" )
207195 return {"clients" : clients , "receivers" : receivers }
208196 except Exception as e :
209197 print (f"[_fetch_one { ip } ] Error: { e } " )
@@ -231,21 +219,16 @@ async def fetch(srv):
231219 async with self ._session .get (f"http://{ srv } :150/sync.json" , timeout = aiohttp .ClientTimeout (total = 10 )) as r :
232220 if r .status == 200 :
233221 data [sv ] = {n : {"lat" : v ["lat" ], "lon" : v ["lon" ], "bad_syncs" : v .get ("bad_syncs" , - 1 ), "peers" : {_salty (p , SALT_MLAT ): pv for p , pv in v .get ("peers" , {}).items ()}} for n , v in (await r .json ()).items ()}
234- print (f"[_fetch_mlat] Fetched sync from { sv } : { len (data [sv ])} entries" )
235222 async with self ._session .get (f"http://{ srv } :150/clients.json" , timeout = aiohttp .ClientTimeout (total = 10 )) as r :
236223 if r .status == 200 :
237224 clients [sv ] = await r .json ()
238- print (f"[_fetch_mlat] Fetched clients from { sv } " )
239225 except Exception as e :
240226 print (f"[_fetch_mlat] Error fetching from { srv } : { e } " )
241227
242- print (f"[_fetch_mlat] Fetching from { MLAT_SERVERS } " )
243228 await asyncio .gather (* (fetch (s ) for s in MLAT_SERVERS ))
244- print (f"[_fetch_mlat] Got data from { len (data )} servers, clients from { len (clients )} servers" )
245229 await self .redis .set (REDIS_KEY_MLAT_SYNC , orjson .dumps (data ), ex = 15 )
246230 await self .redis .set (REDIS_KEY_MLAT_CLIENTS , orjson .dumps (clients ), ex = 15 )
247231 await self .redis .set (REDIS_KEY_MLAT_TOTALCOUNT , orjson .dumps ({"UPDATED" : datetime .now ().strftime ("%a %b %d %H:%M:%S UTC %Y" ), ** {sv : [len (d ), 1337 , 0 ] for sv , d in data .items ()}}), ex = 15 )
248- print (f"[_fetch_mlat] Set Redis keys" )
249232
250233 async def get_clients_per_client_ip (self , ip : str ) -> list :
251234 clients = await self ._json_get (REDIS_KEY_BEAST_CLIENTS ) or []
@@ -290,20 +273,14 @@ async def dispatch_background_task(self):
290273
291274 @_background_task (interval = 60 , lock = "vrs_csv" , lock_expire = 3600 , success_interval = 3600 )
292275 async def _loop (self ):
293- print ("[RedisVRS._loop] Starting CSV fetch" )
294276 for name , url in (("route" , "https://vrs-standing-data.adsb.lol/routes.csv.gz" ), ("airport" , "https://vrs-standing-data.adsb.lol/airports.csv.gz" )):
295277 try :
296278 async with self ._session .get (url ) as r :
297279 if r .status == 200 :
298280 pipe = self .redis .pipeline ()
299- count = 0
300281 for row in gzip .decompress (await r .read ()).decode ().splitlines ():
301282 pipe .set (f"vrs:{ name } :{ row .split (',' )[0 ]} " , row )
302- count += 1
303283 await pipe .execute ()
304- print (f"[RedisVRS._loop] Fetched { name } : { count } rows" )
305- else :
306- print (f"[RedisVRS._loop] Failed to fetch { name } : status { r .status } " )
307284 except Exception as e :
308285 print (f"[RedisVRS._loop] Error fetching { name } : { e } " )
309286 traceback .print_exc ()
@@ -386,9 +363,7 @@ async def dispatch_background_task(self):
386363 async def _loop (self ):
387364 try :
388365 async with asyncio .timeout (10 ):
389- print ("[FeederData._loop] Resolving ingest DNS" )
390366 ips = [x .host for x in await self ._resolver .query (INGEST_DNS , "A" )]
391- print (f"[FeederData._loop] Resolved IPs: { ips } " )
392367 results = await asyncio .gather (* (self ._fetch (ip ) for ip in ips ), return_exceptions = True )
393368 pipe , recv_ingest = self .redis .pipeline (), {}
394369
@@ -398,8 +373,6 @@ async def _loop(self):
398373 continue
399374 if not data :
400375 continue
401- aircraft_count = len (data .get ("aircraft" , []))
402- print (f"[FeederData._loop] Got { aircraft_count } aircraft from { ip } " )
403376 for ac in data .get ("aircraft" , []):
404377 for r in ac .get ("recentReceiverIds" , []):
405378 recv_ingest [r ] = ip
@@ -411,7 +384,6 @@ async def _loop(self):
411384 pipe .expire (f"receiver_ac:{ r } " , 30 )
412385 pipe .set (f"receiver_ingest:{ r } " , ip , ex = 30 )
413386 await pipe .execute ()
414- print (f"[FeederData._loop] Updated { len (recv_ingest )} receivers" )
415387 except Exception as e :
416388 print (f"[FeederData._loop] Error: { e } " )
417389 traceback .print_exc ()
0 commit comments