@@ -19,6 +19,7 @@ use prost_types::Any;
1919use std:: cmp;
2020use std:: collections:: { HashMap , HashSet } ;
2121use std:: fmt;
22+ use std:: hash:: Hash ;
2223use thiserror:: Error ;
2324
2425use graph:: prelude:: * ;
@@ -351,15 +352,113 @@ pub struct EthereumLogFilter {
351352 /// Log filters can be represented as a bipartite graph between contracts and events. An edge
352353 /// exists between a contract and an event if a data source for the contract has a trigger for
353354 /// the event.
354- /// Edges are of `bool` type and indicates when a trigger requires a transaction receipt.
355- contracts_and_events_graph : GraphMap < LogFilterNode , bool , petgraph :: Undirected > ,
355+ /// Edge weights are booleans indicating whether the trigger requires a transaction receipt.
356+ contracts_and_events_graph : MergeGraph ,
356357
357358 /// Event sigs with no associated address, matching on all addresses.
358- /// Maps to a boolean representing if a trigger requires a transaction receipt.
359- wildcard_events : HashMap < EventSignature , bool > ,
360- /// Events with any of the topic filters set
361- /// Maps to a boolean representing if a trigger requires a transaction receipt.
362- events_with_topic_filters : HashMap < EventSignatureWithTopics , bool > ,
359+ /// Values are booleans indicating whether the trigger requires a transaction receipt.
360+ wildcard_events : MergeMap < EventSignature > ,
361+ /// Events with any of the topic filters set.
362+ /// Values are booleans indicating whether the trigger requires a transaction receipt.
363+ events_with_topic_filters : MergeMap < EventSignatureWithTopics > ,
364+ }
365+
366+ /// `HashMap<K, bool>` wrapper whose values are OR-merged on every write.
367+ ///
368+ /// The only mutator that writes values is [`MergeMap::or_insert`] — the inner
369+ /// `HashMap` is private so callers cannot bypass the merge via
370+ /// `HashMap::insert`. Used by `EthereumLogFilter` to track per-key receipt
371+ /// requirements where any handler asking for a receipt at a given key forces
372+ /// receipt fetching.
373+ #[ derive( Clone , Debug ) ]
374+ struct MergeMap < K : Eq + Hash > ( HashMap < K , bool > ) ;
375+
376+ impl < K : Eq + Hash > Default for MergeMap < K > {
377+ fn default ( ) -> Self {
378+ Self ( HashMap :: new ( ) )
379+ }
380+ }
381+
382+ impl < K : Eq + Hash > MergeMap < K > {
383+ fn or_insert ( & mut self , k : K , v : bool ) {
384+ self . 0 . entry ( k) . and_modify ( |e| * e |= v) . or_insert ( v) ;
385+ }
386+
387+ fn get ( & self , k : & K ) -> Option < & bool > {
388+ self . 0 . get ( k)
389+ }
390+
391+ fn contains_key ( & self , k : & K ) -> bool {
392+ self . 0 . contains_key ( k)
393+ }
394+
395+ fn iter ( & self ) -> impl Iterator < Item = ( & K , & bool ) > + ' _ {
396+ self . 0 . iter ( )
397+ }
398+
399+ fn is_empty ( & self ) -> bool {
400+ self . 0 . is_empty ( )
401+ }
402+ }
403+
404+ impl < K : Eq + Hash > IntoIterator for MergeMap < K > {
405+ type Item = ( K , bool ) ;
406+ type IntoIter = std:: collections:: hash_map:: IntoIter < K , bool > ;
407+ fn into_iter ( self ) -> Self :: IntoIter {
408+ self . 0 . into_iter ( )
409+ }
410+ }
411+
412+ /// `GraphMap<LogFilterNode, bool, _>` wrapper that OR-merges edge weights on
413+ /// every write.
414+ ///
415+ /// The only mutator that writes edge weights is [`MergeGraph::or_add_edge`] —
416+ /// the inner `GraphMap` is private so callers cannot bypass the merge via
417+ /// `GraphMap::add_edge`.
418+ #[ derive( Clone , Debug ) ]
419+ struct MergeGraph ( GraphMap < LogFilterNode , bool , petgraph:: Undirected > ) ;
420+
421+ impl Default for MergeGraph {
422+ fn default ( ) -> Self {
423+ Self ( GraphMap :: new ( ) )
424+ }
425+ }
426+
427+ impl MergeGraph {
428+ fn or_add_edge ( & mut self , a : LogFilterNode , b : LogFilterNode , v : bool ) {
429+ // Short-circuit on `v == true`: a `true` weight always wins over any
430+ // prior weight, so we can skip the edge_weight lookup entirely.
431+ let merged = v || self . 0 . edge_weight ( a, b) . copied ( ) . unwrap_or ( false ) ;
432+ self . 0 . add_edge ( a, b, merged) ;
433+ }
434+
435+ fn edge_weight ( & self , a : LogFilterNode , b : LogFilterNode ) -> Option < & bool > {
436+ self . 0 . edge_weight ( a, b)
437+ }
438+
439+ fn contains_edge ( & self , a : LogFilterNode , b : LogFilterNode ) -> bool {
440+ self . 0 . contains_edge ( a, b)
441+ }
442+
443+ fn edge_count ( & self ) -> usize {
444+ self . 0 . edge_count ( )
445+ }
446+
447+ fn nodes ( & self ) -> impl Iterator < Item = LogFilterNode > + ' _ {
448+ self . 0 . nodes ( )
449+ }
450+
451+ fn neighbors ( & self , n : LogFilterNode ) -> impl Iterator < Item = LogFilterNode > + ' _ {
452+ self . 0 . neighbors ( n)
453+ }
454+
455+ fn remove_node ( & mut self , n : LogFilterNode ) -> bool {
456+ self . 0 . remove_node ( n)
457+ }
458+
459+ fn all_edges ( & self ) -> impl Iterator < Item = ( LogFilterNode , LogFilterNode , & bool ) > {
460+ self . 0 . all_edges ( )
461+ }
363462}
364463
365464impl From < EthereumLogFilter > for Vec < LogFilter > {
@@ -458,14 +557,14 @@ impl EthereumLogFilter {
458557 let event_sig = event_handler. topic0 ( ) ;
459558 match ds. address {
460559 Some ( contract) if !event_handler. has_additional_topics ( ) => {
461- this. contracts_and_events_graph . add_edge (
560+ this. contracts_and_events_graph . or_add_edge (
462561 LogFilterNode :: Contract ( contract) ,
463562 LogFilterNode :: Event ( event_sig) ,
464563 event_handler. receipt ,
465564 ) ;
466565 }
467566 Some ( contract) => {
468- this. events_with_topic_filters . insert (
567+ this. events_with_topic_filters . or_insert (
469568 EventSignatureWithTopics :: new (
470569 Some ( contract) ,
471570 event_sig,
@@ -479,11 +578,11 @@ impl EthereumLogFilter {
479578
480579 None if ( !event_handler. has_additional_topics ( ) ) => {
481580 this. wildcard_events
482- . insert ( event_sig, event_handler. receipt ) ;
581+ . or_insert ( event_sig, event_handler. receipt ) ;
483582 }
484583
485584 None => {
486- this. events_with_topic_filters . insert (
585+ this. events_with_topic_filters . or_insert (
487586 EventSignatureWithTopics :: new (
488587 ds. address ,
489588 event_sig,
@@ -505,12 +604,18 @@ impl EthereumLogFilter {
505604 for event_handler in & mapping. event_handlers {
506605 let signature = event_handler. topic0 ( ) ;
507606 this. wildcard_events
508- . insert ( signature, event_handler. receipt ) ;
607+ . or_insert ( signature, event_handler. receipt ) ;
509608 }
510609 this
511610 }
512611
513612 /// Extends this log filter with another one.
613+ ///
614+ /// The `receipt` flag stored at each filter key is OR-combined across the
615+ /// two filters: a key whose receipt requirement is `true` in either filter
616+ /// stays `true` in the merged result. Overwriting with the incoming value
617+ /// would drop a prior `true` and silently downgrade receipt fetching for
618+ /// any handler that declared `receipt: true` at that key.
514619 pub fn extend ( & mut self , other : EthereumLogFilter ) {
515620 if other. is_empty ( ) {
516621 return ;
@@ -523,11 +628,14 @@ impl EthereumLogFilter {
523628 events_with_topic_filters,
524629 } = other;
525630 for ( s, t, e) in contracts_and_events_graph. all_edges ( ) {
526- self . contracts_and_events_graph . add_edge ( s, t, * e) ;
631+ self . contracts_and_events_graph . or_add_edge ( s, t, * e) ;
632+ }
633+ for ( k, v) in wildcard_events {
634+ self . wildcard_events . or_insert ( k, v) ;
635+ }
636+ for ( k, v) in events_with_topic_filters {
637+ self . events_with_topic_filters . or_insert ( k, v) ;
527638 }
528- self . wildcard_events . extend ( wildcard_events) ;
529- self . events_with_topic_filters
530- . extend ( events_with_topic_filters) ;
531639 }
532640
533641 /// An empty filter is one that never matches.
@@ -555,15 +663,15 @@ impl EthereumLogFilter {
555663 // Start with the wildcard event filters.
556664 filters. extend (
557665 self . wildcard_events
558- . into_keys ( )
559- . map ( EthGetLogsFilter :: from_event) ,
666+ . into_iter ( )
667+ . map ( | ( k , _ ) | EthGetLogsFilter :: from_event ( k ) ) ,
560668 ) ;
561669
562670 // Handle events with topic filters.
563671 filters. extend (
564672 self . events_with_topic_filters
565- . into_keys ( )
566- . map ( EthGetLogsFilter :: from_event_with_topics) ,
673+ . into_iter ( )
674+ . map ( | ( k , _ ) | EthGetLogsFilter :: from_event_with_topics ( k ) ) ,
567675 ) ;
568676
569677 // The current algorithm is to repeatedly find the maximum cardinality vertex and turn all
@@ -1194,7 +1302,6 @@ mod tests {
11941302 use base64:: prelude:: * ;
11951303 use graph:: blockchain:: TriggerFilter as _;
11961304 use graph:: firehose:: { CallToFilter , CombinedFilter , LogFilter , MultiLogFilter } ;
1197- use graph:: petgraph:: graphmap:: GraphMap ;
11981305 use graph:: prelude:: EthereumCall ;
11991306 use graph:: prelude:: alloy:: primitives:: { Address , B256 , Bytes , U256 } ;
12001307 use hex:: ToHex ;
@@ -1284,11 +1391,7 @@ mod tests {
12841391 fn ethereum_trigger_filter_to_firehose ( ) {
12851392 let sig = |value : u64 | B256 :: from ( U256 :: from ( value) ) ;
12861393 let mut filter = TriggerFilter {
1287- log : EthereumLogFilter {
1288- contracts_and_events_graph : GraphMap :: new ( ) ,
1289- wildcard_events : HashMap :: new ( ) ,
1290- events_with_topic_filters : HashMap :: new ( ) ,
1291- } ,
1394+ log : EthereumLogFilter :: default ( ) ,
12921395 call : EthereumCallFilter {
12931396 contract_addresses_function_signatures : HashMap :: from_iter ( vec ! [
12941397 ( address( 0 ) , ( 0 , HashSet :: from_iter( vec![ [ 0u8 ; 4 ] ] ) ) ) ,
@@ -1337,17 +1440,17 @@ mod tests {
13371440 } ,
13381441 ] ;
13391442
1340- filter. log . contracts_and_events_graph . add_edge (
1443+ filter. log . contracts_and_events_graph . or_add_edge (
13411444 LogFilterNode :: Contract ( address ( 10 ) ) ,
13421445 LogFilterNode :: Event ( sig ( 100 ) ) ,
13431446 false ,
13441447 ) ;
1345- filter. log . contracts_and_events_graph . add_edge (
1448+ filter. log . contracts_and_events_graph . or_add_edge (
13461449 LogFilterNode :: Contract ( address ( 10 ) ) ,
13471450 LogFilterNode :: Event ( sig ( 101 ) ) ,
13481451 false ,
13491452 ) ;
1350- filter. log . contracts_and_events_graph . add_edge (
1453+ filter. log . contracts_and_events_graph . or_add_edge (
13511454 LogFilterNode :: Contract ( address ( 20 ) ) ,
13521455 LogFilterNode :: Event ( sig ( 100 ) ) ,
13531456 false ,
@@ -1407,11 +1510,7 @@ mod tests {
14071510 let address = |value : u64 | Address :: left_padding_from ( & value. to_le_bytes ( ) ) ;
14081511 let sig = |value : u64 | B256 :: left_padding_from ( & value. to_le_bytes ( ) ) ;
14091512 let mut filter = TriggerFilter {
1410- log : EthereumLogFilter {
1411- contracts_and_events_graph : GraphMap :: new ( ) ,
1412- wildcard_events : HashMap :: new ( ) ,
1413- events_with_topic_filters : HashMap :: new ( ) ,
1414- } ,
1513+ log : EthereumLogFilter :: default ( ) ,
14151514 call : EthereumCallFilter {
14161515 contract_addresses_function_signatures : HashMap :: new ( ) ,
14171516 wildcard_signatures : HashSet :: new ( ) ,
@@ -1423,7 +1522,7 @@ mod tests {
14231522 } ,
14241523 } ;
14251524
1426- filter. log . contracts_and_events_graph . add_edge (
1525+ filter. log . contracts_and_events_graph . or_add_edge (
14271526 LogFilterNode :: Contract ( address ( 10 ) ) ,
14281527 LogFilterNode :: Event ( sig ( 101 ) ) ,
14291528 false ,
@@ -1759,10 +1858,10 @@ fn complete_log_filter() {
17591858 let contracts: BTreeSet < _ > = ( 0 ..j) . map ( |n| Address :: from ( [ n as u8 ; 20 ] ) ) . collect ( ) ;
17601859
17611860 // Construct the complete bipartite graph with i events and j contracts.
1762- let mut contracts_and_events_graph = GraphMap :: new ( ) ;
1861+ let mut filter = EthereumLogFilter :: default ( ) ;
17631862 for & contract in & contracts {
17641863 for & event in & events {
1765- contracts_and_events_graph. add_edge (
1864+ filter . contracts_and_events_graph . or_add_edge (
17661865 LogFilterNode :: Contract ( contract) ,
17671866 LogFilterNode :: Event ( event) ,
17681867 false ,
@@ -1771,13 +1870,9 @@ fn complete_log_filter() {
17711870 }
17721871
17731872 // Run `eth_get_logs_filters`, which is what we want to test.
1774- let logs_filters: Vec < _ > = EthereumLogFilter {
1775- contracts_and_events_graph,
1776- wildcard_events : HashMap :: new ( ) ,
1777- events_with_topic_filters : HashMap :: new ( ) ,
1778- }
1779- . eth_get_logs_filters ( ENV_VARS . get_logs_max_contracts )
1780- . collect ( ) ;
1873+ let logs_filters: Vec < _ > = filter
1874+ . eth_get_logs_filters ( ENV_VARS . get_logs_max_contracts )
1875+ . collect ( ) ;
17811876
17821877 // Assert that a contract or event is filtered on iff it was present in the graph.
17831878 assert_eq ! (
@@ -1842,14 +1937,6 @@ fn log_filter_require_transacion_receipt_method() {
18421937
18431938 let wildcard_event_with_receipt = b256 ( 6 ) ;
18441939 let wildcard_event_without_receipt = b256 ( 7 ) ;
1845- let wildcard_events = [
1846- ( wildcard_event_with_receipt, true ) ,
1847- ( wildcard_event_without_receipt, false ) ,
1848- ]
1849- . into_iter ( )
1850- . collect ( ) ;
1851-
1852- let events_with_topic_filters = HashMap :: new ( ) ;
18531940
18541941 let alien_event_signature = b256 ( 8 ) ; // those will not be inserted in the graph
18551942 let alien_contract_address = address ( 9 ) ;
@@ -1877,25 +1964,29 @@ fn log_filter_require_transacion_receipt_method() {
18771964 // event_b -- contract_a [ receipt=false ]
18781965 // }
18791966 // ```
1880- let mut contracts_and_events_graph = GraphMap :: new ( ) ;
1881-
1882- let event_a_id = contracts_and_events_graph. add_node ( event_a_node) ;
1883- let event_b_id = contracts_and_events_graph. add_node ( event_b_node) ;
1884- let event_c_id = contracts_and_events_graph. add_node ( event_c_node) ;
1885- let contract_a_id = contracts_and_events_graph. add_node ( contract_a_node) ;
1886- let contract_b_id = contracts_and_events_graph. add_node ( contract_b_node) ;
1887- let contract_c_id = contracts_and_events_graph. add_node ( contract_c_node) ;
1888- contracts_and_events_graph. add_edge ( event_a_id, contract_a_id, true ) ;
1889- contracts_and_events_graph. add_edge ( event_b_id, contract_b_id, true ) ;
1890- contracts_and_events_graph. add_edge ( event_a_id, contract_b_id, false ) ;
1891- contracts_and_events_graph. add_edge ( event_b_id, contract_a_id, false ) ;
1892- contracts_and_events_graph. add_edge ( event_c_id, contract_c_id, true ) ;
1893-
1894- let filter = EthereumLogFilter {
1895- contracts_and_events_graph,
1896- wildcard_events,
1897- events_with_topic_filters,
1898- } ;
1967+ // TODO(krishna): Test events with topic filters
1968+ let mut filter = EthereumLogFilter :: default ( ) ;
1969+ filter
1970+ . contracts_and_events_graph
1971+ . or_add_edge ( event_a_node, contract_a_node, true ) ;
1972+ filter
1973+ . contracts_and_events_graph
1974+ . or_add_edge ( event_b_node, contract_b_node, true ) ;
1975+ filter
1976+ . contracts_and_events_graph
1977+ . or_add_edge ( event_a_node, contract_b_node, false ) ;
1978+ filter
1979+ . contracts_and_events_graph
1980+ . or_add_edge ( event_b_node, contract_a_node, false ) ;
1981+ filter
1982+ . contracts_and_events_graph
1983+ . or_add_edge ( event_c_node, contract_c_node, true ) ;
1984+ filter
1985+ . wildcard_events
1986+ . or_insert ( wildcard_event_with_receipt, true ) ;
1987+ filter
1988+ . wildcard_events
1989+ . or_insert ( wildcard_event_without_receipt, false ) ;
18991990
19001991 let empty_vec: Vec < B256 > = vec ! [ ] ;
19011992
0 commit comments