Skip to content

Commit d78a244

Browse files
Use more internal iteration (#689)
* External -> internal consolidation * External -> internal merging * Replace VecChunker with ContainerChunker
1 parent a8eec26 commit d78a244

7 files changed

Lines changed: 350 additions & 506 deletions

File tree

differential-dataflow/src/consolidation.rs

Lines changed: 50 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
1111
//! specific behavior you require.
1212
13-
use std::cmp::Ordering;
1413
use std::collections::VecDeque;
15-
use timely::container::{ContainerBuilder, DrainContainer, PushInto};
14+
use columnation::Columnation;
15+
use timely::container::{ContainerBuilder, PushInto};
1616
use crate::Data;
17-
use crate::difference::{IsZero, Semigroup};
17+
use crate::difference::Semigroup;
1818

1919
/// Sorts and consolidates `vec`.
2020
///
@@ -232,115 +232,63 @@ where
232232
}
233233
}
234234

235-
/// Layout of containers and their read items to be consolidated.
235+
/// A container that can sort and consolidate its contents internally.
236236
///
237-
/// This trait specifies behavior to extract keys and diffs from container's read
238-
/// items. Consolidation accumulates the diffs per key.
237+
/// The container knows its own layout — how to sort its elements, how to
238+
/// compare adjacent entries, and how to merge diffs. The caller provides
239+
/// a `target` container to receive the consolidated output, allowing
240+
/// reuse of allocations across calls.
239241
///
240-
/// The trait requires `Container` to have access to its `Item` GAT.
241-
pub trait ConsolidateLayout: DrainContainer {
242-
/// Key portion of data, essentially everything minus the diff
243-
type Key<'a>: Eq where Self: 'a;
244-
245-
/// GAT diff type.
246-
type Diff<'a>;
247-
248-
/// Owned diff type.
249-
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;
250-
251-
/// Converts a reference diff into an owned diff.
252-
fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned;
253-
254-
/// Deconstruct an item into key and diff. Must be cheap.
255-
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);
256-
257-
/// Push an element to a compatible container.
258-
///
259-
/// This function is odd to have, so let's explain why it exists. Ideally, the container
260-
/// would accept a `(key, diff)` pair and we wouldn't need this function. However, we
261-
/// might never be in a position where this is true: Vectors can push any `T`, which would
262-
/// collide with a specific implementation for pushing tuples of mixes GATs and owned types.
263-
///
264-
/// For this reason, we expose a function here that takes a GAT key and an owned diff, and
265-
/// leave it to the implementation to "patch" a suitable item that can be pushed into `self`.
266-
fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);
267-
268-
/// Compare two items by key to sort containers.
269-
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;
270-
271-
/// Returns the number of items in the container.
242+
/// After the call, `target` contains the sorted, consolidated data and
243+
/// `self` may be empty or in an unspecified state (implementations should
244+
/// document this).
245+
pub trait Consolidate {
246+
/// The number of elements in the container.
272247
fn len(&self) -> usize;
273-
274-
/// Clear the container. Afterwards, `len()` should return 0.
248+
/// Clear the container.
275249
fn clear(&mut self);
250+
/// Sort and consolidate `self` into `target`.
251+
fn consolidate_into(&mut self, target: &mut Self);
252+
}
276253

277-
/// Consolidate the supplied container.
254+
impl<D: Ord, T: Ord, R: Semigroup> Consolidate for Vec<(D, T, R)> {
255+
fn len(&self) -> usize { Vec::len(self) }
256+
fn clear(&mut self) { Vec::clear(self) }
278257
fn consolidate_into(&mut self, target: &mut Self) {
279-
// Sort input data
280-
let mut permutation = Vec::with_capacity(self.len());
281-
permutation.extend(self.drain());
282-
permutation.sort_by(|a, b| Self::cmp(a, b));
283-
284-
// Iterate over the data, accumulating diffs for like keys.
285-
let mut iter = permutation.drain(..);
286-
if let Some(item) = iter.next() {
287-
288-
let (k, d) = Self::into_parts(item);
289-
let mut prev_key = k;
290-
let mut prev_diff = Self::owned_diff(d);
291-
292-
for item in iter {
293-
let (next_key, next_diff) = Self::into_parts(item);
294-
if next_key == prev_key {
295-
prev_diff.plus_equals(&next_diff);
296-
}
297-
else {
298-
if !prev_diff.is_zero() {
299-
target.push_with_diff(prev_key, prev_diff);
300-
}
301-
prev_key = next_key;
302-
prev_diff = Self::owned_diff(next_diff);
303-
}
304-
}
305-
306-
if !prev_diff.is_zero() {
307-
target.push_with_diff(prev_key, prev_diff);
308-
}
309-
}
258+
consolidate_updates(self);
259+
std::mem::swap(self, target);
310260
}
311261
}
312262

313-
impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
314-
where
315-
D: Ord + Clone + 'static,
316-
T: Ord + Clone + 'static,
317-
R: Semigroup + Clone + 'static,
318-
{
319-
type Key<'a> = (D, T) where Self: 'a;
320-
type Diff<'a> = R where Self: 'a;
321-
type DiffOwned = R;
322-
323-
fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff }
324-
325-
fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
326-
((data, time), diff)
327-
}
328-
329-
fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
330-
(&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
331-
}
332-
333-
fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
334-
self.push((data, time, diff));
335-
}
336-
337-
#[inline] fn len(&self) -> usize { Vec::len(self) }
338-
#[inline] fn clear(&mut self) { Vec::clear(self) }
339-
340-
/// Consolidate the supplied container.
263+
impl<D: Ord + Columnation, T: Ord + Columnation, R: Semigroup + Columnation> Consolidate for crate::containers::TimelyStack<(D, T, R)> {
264+
fn len(&self) -> usize { self[..].len() }
265+
fn clear(&mut self) { crate::containers::TimelyStack::clear(self) }
341266
fn consolidate_into(&mut self, target: &mut Self) {
342-
consolidate_updates(self);
343-
std::mem::swap(self, target);
267+
let len = self[..].len();
268+
let mut indices: Vec<usize> = (0..len).collect();
269+
indices.sort_unstable_by(|&i, &j| {
270+
let (d1, t1, _) = &self[i];
271+
let (d2, t2, _) = &self[j];
272+
(d1, t1).cmp(&(d2, t2))
273+
});
274+
target.clear();
275+
let mut idx = 0;
276+
while idx < indices.len() {
277+
let (d, t, r) = &self[indices[idx]];
278+
let mut r_owned = r.clone();
279+
idx += 1;
280+
while idx < indices.len() {
281+
let (d2, t2, r2) = &self[indices[idx]];
282+
if d == d2 && t == t2 {
283+
r_owned.plus_equals(r2);
284+
idx += 1;
285+
} else { break; }
286+
}
287+
if !r_owned.is_zero() {
288+
target.copy_destructured(d, t, &r_owned);
289+
}
290+
}
291+
self.clear();
344292
}
345293
}
346294

differential-dataflow/src/operators/arrange/arrangement.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{Data, VecCollection, AsCollection};
3030
use crate::difference::Semigroup;
3131
use crate::lattice::Lattice;
3232
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33-
use crate::trace::implementations::merge_batcher::container::MergerChunk;
33+
use crate::trace::implementations::merge_batcher::container::InternalMerge;
3434

3535
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
3636
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
@@ -259,7 +259,7 @@ where
259259
Time=T1::Time,
260260
Diff: Abelian,
261261
>+'static,
262-
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
262+
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
263263
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
264264
{
265265
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
@@ -281,7 +281,7 @@ where
281281
ValOwn: Data,
282282
Time=T1::Time,
283283
>+'static,
284-
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
284+
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
285285
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
286286
{
287287
use crate::operators::reduce::reduce_trace;

differential-dataflow/src/operators/reduce.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::operators::arrange::{Arranged, TraceAgent};
2020
use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
2121
use crate::trace::cursor::CursorList;
2222
use crate::trace::implementations::containers::BatchContainer;
23-
use crate::trace::implementations::merge_batcher::container::MergerChunk;
23+
use crate::trace::implementations::merge_batcher::container::InternalMerge;
2424
use crate::trace::TraceReader;
2525

2626
/// A key-wise reduction of values in an input trace.
@@ -31,7 +31,7 @@ where
3131
G: Scope<Timestamp=T1::Time>,
3232
T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
3333
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static,
34-
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
34+
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
3535
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
3636
{
3737
let mut result_trace = None;

differential-dataflow/src/trace/implementations/chunker.rs

Lines changed: 9 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -7,124 +7,15 @@ use timely::Container;
77
use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
88

99
use crate::containers::TimelyStack;
10-
use crate::consolidation::{consolidate_updates, ConsolidateLayout};
10+
use crate::consolidation::{consolidate_updates, Consolidate};
1111
use crate::difference::Semigroup;
1212

13-
/// Chunk a stream of vectors into chains of vectors.
14-
pub struct VecChunker<T> {
15-
pending: Vec<T>,
16-
ready: VecDeque<Vec<T>>,
17-
empty: Option<Vec<T>>,
18-
}
19-
20-
impl<T> Default for VecChunker<T> {
21-
fn default() -> Self {
22-
Self {
23-
pending: Vec::default(),
24-
ready: VecDeque::default(),
25-
empty: None,
26-
}
27-
}
28-
}
29-
30-
impl<K, V, T, R> VecChunker<((K, V), T, R)>
31-
where
32-
K: Ord,
33-
V: Ord,
34-
T: Ord,
35-
R: Semigroup,
36-
{
37-
const BUFFER_SIZE_BYTES: usize = 8 << 10;
38-
fn chunk_capacity() -> usize {
39-
let size = ::std::mem::size_of::<((K, V), T, R)>();
40-
if size == 0 {
41-
Self::BUFFER_SIZE_BYTES
42-
} else if size <= Self::BUFFER_SIZE_BYTES {
43-
Self::BUFFER_SIZE_BYTES / size
44-
} else {
45-
1
46-
}
47-
}
48-
49-
/// Form chunks out of pending data, if needed. This function is meant to be applied to
50-
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
51-
/// half full when the function returns.
52-
///
53-
/// `form_chunk` does the following:
54-
/// * If pending is full, consolidate.
55-
/// * If after consolidation it's more than half full, peel off chunks,
56-
/// leaving behind any partial chunk in pending.
57-
fn form_chunk(&mut self) {
58-
consolidate_updates(&mut self.pending);
59-
if self.pending.len() >= Self::chunk_capacity() {
60-
while self.pending.len() > Self::chunk_capacity() {
61-
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
62-
chunk.extend(self.pending.drain(..chunk.capacity()));
63-
self.ready.push_back(chunk);
64-
}
65-
}
66-
}
67-
}
68-
69-
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
70-
where
71-
K: Ord + Clone,
72-
V: Ord + Clone,
73-
T: Ord + Clone,
74-
R: Semigroup + Clone,
75-
{
76-
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
77-
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
78-
// because we don't write more than capacity elements into the buffer.
79-
// Important: Consolidation requires `pending` to have twice the chunk capacity to
80-
// amortize its cost. Otherwise, it risks to do quadratic work.
81-
if self.pending.capacity() < Self::chunk_capacity() * 2 {
82-
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
83-
}
84-
85-
let mut drain = container.drain(..).peekable();
86-
while drain.peek().is_some() {
87-
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
88-
if self.pending.len() == self.pending.capacity() {
89-
self.form_chunk();
90-
}
91-
}
92-
}
93-
}
94-
95-
impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
96-
where
97-
K: Ord + Clone + 'static,
98-
V: Ord + Clone + 'static,
99-
T: Ord + Clone + 'static,
100-
R: Semigroup + Clone + 'static,
101-
{
102-
type Container = Vec<((K, V), T, R)>;
103-
104-
fn extract(&mut self) -> Option<&mut Self::Container> {
105-
if let Some(ready) = self.ready.pop_front() {
106-
self.empty = Some(ready);
107-
self.empty.as_mut()
108-
} else {
109-
None
110-
}
111-
}
112-
113-
fn finish(&mut self) -> Option<&mut Self::Container> {
114-
if !self.pending.is_empty() {
115-
consolidate_updates(&mut self.pending);
116-
while !self.pending.is_empty() {
117-
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
118-
chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())));
119-
self.ready.push_back(chunk);
120-
}
121-
}
122-
self.empty = self.ready.pop_front();
123-
self.empty.as_mut()
124-
}
125-
}
126-
127-
/// Chunk a stream of vectors into chains of vectors.
13+
/// Chunk a stream of vectors into chains of columnation stacks.
14+
///
15+
/// This chunker accumulates into a `Vec` (not a `TimelyStack`) for efficient
16+
/// in-place sorting and consolidation, then copies the consolidated results
17+
/// into `TimelyStack` chunks. This avoids the cost of sorting through
18+
/// columnation indirection.
12819
pub struct ColumnationChunker<T: Columnation> {
12920
pending: Vec<T>,
13021
ready: VecDeque<TimelyStack<T>>,
@@ -159,14 +50,6 @@ where
15950
}
16051
}
16152

162-
/// Form chunks out of pending data, if needed. This function is meant to be applied to
163-
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
164-
/// half full when the function returns.
165-
///
166-
/// `form_chunk` does the following:
167-
/// * If pending is full, consolidate.
168-
/// * If after consolidation it's more than half full, peel off chunks,
169-
/// leaving behind any partial chunk in pending.
17053
fn form_chunk(&mut self) {
17154
consolidate_updates(&mut self.pending);
17255
if self.pending.len() >= Self::chunk_capacity() {
@@ -188,8 +71,6 @@ where
18871
R: Columnation + Semigroup + Clone,
18972
{
19073
fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
191-
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
192-
// because we don't write more than capacity elements into the buffer.
19374
if self.pending.capacity() < Self::chunk_capacity() * 2 {
19475
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
19576
}
@@ -257,7 +138,7 @@ where
257138
Input: DrainContainer,
258139
Output: Default
259140
+ SizableContainer
260-
+ ConsolidateLayout
141+
+ Consolidate
261142
+ PushInto<Input::Item<'a>>,
262143
{
263144
fn push_into(&mut self, container: &'a mut Input) {
@@ -283,7 +164,7 @@ where
283164

284165
impl<Output> ContainerBuilder for ContainerChunker<Output>
285166
where
286-
Output: SizableContainer + ConsolidateLayout + Container,
167+
Output: SizableContainer + Consolidate + Container,
287168
{
288169
type Container = Output;
289170

0 commit comments

Comments
 (0)