Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yarn-project/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ function bench_cmds {
echo "$hash BENCH_OUTPUT=bench-out/native_world_state.bench.json yarn-project/scripts/run_test.sh world-state/src/native/native_bench.test.ts"
echo "$hash BENCH_OUTPUT=bench-out/kv_store.bench.json yarn-project/scripts/run_test.sh kv-store/src/bench/map_bench.test.ts"
echo "$hash BENCH_OUTPUT=bench-out/tx_pool.bench.json yarn-project/scripts/run_test.sh p2p/src/mem_pools/tx_pool/tx_pool_bench.test.ts"
echo "$hash BENCH_OUTPUT=bench-out/p2p_client_proposal_tx_collector.bench.json yarn-project/scripts/run_test.sh p2p/src/client/test/tx_proposal_collector/p2p_client.proposal_tx_collector_bench.test.ts"
echo "$hash BENCH_OUTPUT=bench-out/tx.bench.json yarn-project/scripts/run_test.sh stdlib/src/tx/tx_bench.test.ts"
echo "$hash:ISOLATE=1:CPUS=10:MEM=16g:LOG_LEVEL=silent BENCH_OUTPUT=bench-out/proving_broker.bench.json yarn-project/scripts/run_test.sh prover-client/src/test/proving_broker_testbench.test.ts"
echo "$hash:ISOLATE=1:CPUS=16:MEM=16g BENCH_OUTPUT=bench-out/avm_bulk_test.bench.json yarn-project/scripts/run_test.sh bb-prover/src/avm_proving_tests/avm_bulk.test.ts"
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/foundation/src/queue/semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { FifoMemoryQueue } from './fifo_memory_queue.js';

export interface ISemaphore {
acquire(): Promise<void>;
release(): void;
}

/**
* Allows the acquiring of up to `size` tokens before calls to acquire block, waiting for a call to release().
*/
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export async function createP2PClient<T extends P2PClientType>(
}

const txCollection = new TxCollection(
p2pService,
p2pService.getBatchTxRequesterService(),
nodeSources,
l1Constants,
mempools.txPool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import type { EpochCache } from '@aztec/epoch-cache';
import { BlockNumber } from '@aztec/foundation/branded-types';
import { times } from '@aztec/foundation/collection';
import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer';
import { Fr } from '@aztec/foundation/curves/bn254';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { retryUntil } from '@aztec/foundation/retry';
import { sleep } from '@aztec/foundation/sleep';
import { emptyChainConfig } from '@aztec/stdlib/config';
import type { WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
import { makeBlockHeader, makeBlockProposal, mockTx } from '@aztec/stdlib/testing';
import { Tx, TxHash } from '@aztec/stdlib/tx';

import { describe, expect, it, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import type { P2PClient } from '../../client/p2p_client.js';
import { type P2PConfig, getP2PDefaultConfig } from '../../config.js';
import type { AttestationPool } from '../../mem_pools/attestation_pool/attestation_pool.js';
import type { TxPool } from '../../mem_pools/tx_pool/index.js';
import { BatchTxRequester } from '../../services/reqresp/batch-tx-requester/batch_tx_requester.js';
import type { BatchTxRequesterLibP2PService } from '../../services/reqresp/batch-tx-requester/interface.js';
import type { ConnectionSampler } from '../../services/reqresp/connection-sampler/connection_sampler.js';
import { generatePeerIdPrivateKeys } from '../../test-helpers/generate-peer-id-private-keys.js';
import { getPorts } from '../../test-helpers/get-ports.js';
import { makeEnrs } from '../../test-helpers/make-enrs.js';
import { makeAndStartTestP2PClient, makeAndStartTestP2PClients } from '../../test-helpers/make-test-p2p-clients.js';

const TEST_TIMEOUT = 120_000;
jest.setTimeout(TEST_TIMEOUT);

describe('p2p client integration batch txs', () => {
let txPool: MockProxy<TxPool>;
let attestationPool: MockProxy<AttestationPool>;
let epochCache: MockProxy<EpochCache>;
let worldState: MockProxy<WorldStateSynchronizer>;

let mockP2PService: MockProxy<BatchTxRequesterLibP2PService>;
let connectionSampler: MockProxy<ConnectionSampler>;

let logger: Logger;
let p2pBaseConfig: P2PConfig;

let clients: P2PClient[] = [];

beforeEach(() => {
clients = [];
txPool = mock<TxPool>();
attestationPool = mock<AttestationPool>();
epochCache = mock<EpochCache>();
worldState = mock<WorldStateSynchronizer>();
connectionSampler = mock<ConnectionSampler>();
mockP2PService = mock<BatchTxRequesterLibP2PService>({ connectionSampler });
mockP2PService.txValidator.mockResolvedValue(true);

logger = createLogger('p2p:test:integration:batch');
p2pBaseConfig = { ...emptyChainConfig, ...getP2PDefaultConfig() };

//@ts-expect-error - we want to mock the getEpochAndSlotInNextL1Slot method, mocking ts is enough
epochCache.getEpochAndSlotInNextL1Slot.mockReturnValue({ ts: BigInt(0) });
epochCache.getRegisteredValidators.mockResolvedValue([]);

txPool.hasTxs.mockResolvedValue([]);
txPool.getAllTxs.mockImplementation(() => {
return Promise.resolve([] as Tx[]);
});
txPool.addTxs.mockResolvedValue(1);
txPool.getTxsByHash.mockImplementation(() => {
return Promise.resolve([] as Tx[]);
});

worldState.status.mockResolvedValue({
state: mock(),
syncSummary: {
latestBlockNumber: BlockNumber(0),
latestBlockHash: '',
finalizedBlockNumber: BlockNumber(0),
treesAreSynched: false,
oldestHistoricBlockNumber: BlockNumber(0),
},
});
logger.info(`Starting test ${expect.getState().currentTestName}`);
});

afterEach(async () => {
logger.info(`Tearing down state for ${expect.getState().currentTestName}`);
await shutdown(clients);
logger.info('Shut down p2p clients');

jest.restoreAllMocks();
jest.resetAllMocks();
jest.clearAllMocks();

clients = [];
});

// Shutdown all test clients
const shutdown = async (clients: P2PClient[]) => {
await Promise.all(clients.map(client => client.stop()));
await sleep(1000);
};

const createBlockProposal = (blockNumber: number, blockHash: Fr, txHashes: TxHash[]) => {
return makeBlockProposal({
signer: Secp256k1Signer.random(),
blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(blockNumber) }),
archiveRoot: blockHash,
txHashes,
});
};

const setupClients = async (numberOfPeers: number, txPoolMocks?: MockProxy<TxPool>[]) => {
if (txPoolMocks) {
const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfPeers);
let ports = [];
while (true) {
try {
ports = await getPorts(numberOfPeers);
break;
} catch {
await sleep(1000);
}
}
const peerEnrs = await makeEnrs(peerIdPrivateKeys, ports, p2pBaseConfig);

for (let i = 0; i < numberOfPeers; i++) {
const client = await makeAndStartTestP2PClient(peerIdPrivateKeys[i], ports[i], peerEnrs, {
p2pBaseConfig,
mockAttestationPool: attestationPool,
mockTxPool: txPoolMocks[i],
mockEpochCache: epochCache,
mockWorldState: worldState,
logger: createLogger(`p2p:${i}`),
});
clients.push(client);
}

return;
}

clients = (
await makeAndStartTestP2PClients(numberOfPeers, {
p2pBaseConfig,
mockAttestationPool: attestationPool,
mockTxPool: txPool,
mockEpochCache: epochCache,
mockWorldState: worldState,
logger,
})
).map(x => x.client);
};

async function makeSureClientsAreStarted() {
// Give the nodes time to discover each other
await sleep(4000);
for (const c of clients) {
await retryUntil(async () => (await c.getPeers()).length == clients.length - 1, 'peers discovered', 12, 0.5);
}

logger.info('Finished waiting for clients to connect');
}

it('batch requester fetches all missing txs from multiple peers', async () => {
const NUMBER_OF_PEERS = 4;

const txCount = 20;
const txs = await Promise.all(times(txCount, () => mockTx()));
const txHashes = await Promise.all(txs.map(tx => tx.getTxHash()));

const blockNumber = 5;
const blockHash = Fr.random();
const blockProposal = await createBlockProposal(blockNumber, blockHash, txHashes);

// Distribute transactions across peers (simulating partial knowledge)
// Peer 0 has no txs (client requesting)
const peerTxDistribution = [
{ start: 0, end: 0 }, // Peer 0 (requester)
{ start: 0, end: 11 },
{ start: 6, end: 15 },
{ start: 10, end: 20 }, // Peer 3
];

// Create individual txPool mocks for each peer
const txPoolMocks: MockProxy<TxPool>[] = [];
for (let i = 0; i < NUMBER_OF_PEERS; i++) {
const peerTxPool = mock<TxPool>();
const { start, end } = peerTxDistribution[i];
const peerTxs = txs.slice(start, end);
const peerTxHashSet = new Set(peerTxs.map(tx => tx.txHash.toString()));

peerTxPool.hasTxs.mockImplementation((hashes: TxHash[]) => {
return Promise.resolve(hashes.map(h => peerTxHashSet.has(h.toString())));
});
peerTxPool.getTxsByHash.mockImplementation((hashes: TxHash[]) => {
return Promise.resolve(hashes.map(hash => peerTxs.find(t => t.txHash.equals(hash))));
});

txPoolMocks.push(peerTxPool);
}

await setupClients(NUMBER_OF_PEERS, txPoolMocks);
await makeSureClientsAreStarted();

const peerIds = clients.map(client => (client as any).p2pService.node.peerId);
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peerIds);

attestationPool.getBlockProposal.mockResolvedValue(blockProposal);

// Client 0 is missing all transactions
const missingTxHashes = txHashes;

// Create BatchTxRequester instance
const [client0] = clients;
mockP2PService.reqResp = (client0 as any).p2pService.reqresp;

const requester = new BatchTxRequester(
missingTxHashes,
blockProposal,
undefined, // no pinned peer
5_000,
mockP2PService,
logger,
);

const fetchedTxs = await BatchTxRequester.collectAllTxs(requester.run());

// Verify all transactions were fetched
expect(fetchedTxs).toBeDefined();
const fetchedHashes = await Promise.all(fetchedTxs!.map(tx => tx.getTxHash()));
expect(
new Set(fetchedHashes.map(h => h.toString())).difference(new Set(txHashes.map(h => h.toString()))).size,
).toBe(0);
});
});
Loading
Loading