Skip to content

Commit 400a6f1

Browse files
refine delete behaviour & schema api
1 parent 03fd91f commit 400a6f1

37 files changed

+938
-491
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.fluss.metadata.DatabaseDescriptor;
5252
import org.apache.fluss.metadata.DatabaseInfo;
5353
import org.apache.fluss.metadata.DeleteBehavior;
54+
import org.apache.fluss.metadata.AggFunction;
5455
import org.apache.fluss.metadata.KvFormat;
5556
import org.apache.fluss.metadata.LogFormat;
5657
import org.apache.fluss.metadata.PartitionInfo;
@@ -505,6 +506,48 @@ void testCreateTableWithDeleteBehavior() {
505506
TableInfo tableInfo2 = admin.getTableInfo(tablePath2).join();
506507
assertThat(tableInfo2.getTableConfig().getDeleteBehavior()).hasValue(DeleteBehavior.IGNORE);
507508

509+
// Test 2.5: AGGREGATE merge engine - should set delete behavior to IGNORE
510+
TablePath tablePathAggregate =
511+
TablePath.of("fluss", "test_ignore_delete_for_aggregate");
512+
Schema aggregateSchema =
513+
Schema.newBuilder()
514+
.column("id", DataTypes.INT())
515+
.column("count", DataTypes.BIGINT(), AggFunction.SUM)
516+
.primaryKey("id")
517+
.build();
518+
Map<String, String> propertiesAggregate = new HashMap<>();
519+
propertiesAggregate.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
520+
TableDescriptor tableDescriptorAggregate =
521+
TableDescriptor.builder()
522+
.schema(aggregateSchema)
523+
.comment("aggregate merge engine table")
524+
.properties(propertiesAggregate)
525+
.build();
526+
admin.createTable(tablePathAggregate, tableDescriptorAggregate, false).join();
527+
// Get the table and verify delete behavior is changed to IGNORE
528+
TableInfo tableInfoAggregate = admin.getTableInfo(tablePathAggregate).join();
529+
assertThat(tableInfoAggregate.getTableConfig().getDeleteBehavior())
530+
.hasValue(DeleteBehavior.IGNORE);
531+
532+
// Test 2.6: AGGREGATE merge engine with delete behavior explicitly set to ALLOW - should
533+
// be allowed
534+
TablePath tablePathAggregateAllow =
535+
TablePath.of("fluss", "test_allow_delete_for_aggregate");
536+
Map<String, String> propertiesAggregateAllow = new HashMap<>();
537+
propertiesAggregateAllow.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
538+
propertiesAggregateAllow.put(ConfigOptions.TABLE_DELETE_BEHAVIOR.key(), "ALLOW");
539+
TableDescriptor tableDescriptorAggregateAllow =
540+
TableDescriptor.builder()
541+
.schema(aggregateSchema)
542+
.comment("aggregate merge engine table with allow delete")
543+
.properties(propertiesAggregateAllow)
544+
.build();
545+
admin.createTable(tablePathAggregateAllow, tableDescriptorAggregateAllow, false).join();
546+
// Get the table and verify delete behavior is set to ALLOW
547+
TableInfo tableInfoAggregateAllow = admin.getTableInfo(tablePathAggregateAllow).join();
548+
assertThat(tableInfoAggregateAllow.getTableConfig().getDeleteBehavior())
549+
.hasValue(DeleteBehavior.ALLOW);
550+
508551
// Test 3: FIRST_ROW merge engine with delete behavior explicitly set to ALLOW
509552
TablePath tablePath3 = TablePath.of("fluss", "test_allow_delete_for_first_row");
510553
Map<String, String> properties3 = new HashMap<>();

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,25 +1441,18 @@ public class ConfigOptions {
14411441
+ "If not set, 'last_value_ignore_nulls' will be used as default. "
14421442
+ "Field-specific aggregate functions can be configured using 'table.merge-engine.aggregate.<field-name>'.");
14431443

1444-
public static final ConfigOption<Boolean> TABLE_AGG_REMOVE_RECORD_ON_DELETE =
1445-
key("table.aggregation.remove-record-on-delete")
1446-
.booleanType()
1447-
.defaultValue(false)
1448-
.withDescription(
1449-
"Whether to remove the entire record when receiving a DELETE operation in aggregate merge engine. "
1450-
+ "By default, DELETE operations are not supported in aggregate merge engine.");
1451-
14521444
public static final ConfigOption<DeleteBehavior> TABLE_DELETE_BEHAVIOR =
14531445
key("table.delete.behavior")
14541446
.enumType(DeleteBehavior.class)
14551447
.defaultValue(DeleteBehavior.ALLOW)
14561448
.withDescription(
14571449
"Defines the delete behavior for the primary key table. "
14581450
+ "The supported delete behaviors are `allow`, `ignore`, and `disable`. "
1459-
+ "The `allow` behavior allows normal delete operations (default). "
1451+
+ "The `allow` behavior allows normal delete operations (default for default merge engine). "
14601452
+ "The `ignore` behavior silently skips delete requests without error. "
14611453
+ "The `disable` behavior rejects delete requests with a clear error message. "
1462-
+ "For tables with FIRST_ROW or VERSIONED merge engines, this option defaults to `ignore`.");
1454+
+ "For tables with FIRST_ROW, VERSIONED, or AGGREGATE merge engines, this option defaults to `ignore`. "
1455+
+ "Note: For AGGREGATE merge engine, when set to `allow`, delete operations will remove the entire record.");
14631456

14641457
public static final ConfigOption<String> TABLE_AUTO_INCREMENT_FIELDS =
14651458
key("table.auto-increment.fields")

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,6 @@ public Optional<String> getMergeEngineVersionColumn() {
118118
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
119119
}
120120

121-
/**
122-
* Whether to remove record on delete for aggregate merge engine.
123-
*
124-
* @return true if record should be removed on delete, false otherwise
125-
*/
126-
public boolean getAggregationRemoveRecordOnDelete() {
127-
return config.get(ConfigOptions.TABLE_AGG_REMOVE_RECORD_ON_DELETE);
128-
}
129-
130121
/**
131122
* Gets the listagg delimiter for a specific field in aggregate merge engine.
132123
*

fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java

Lines changed: 80 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -19,93 +19,112 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121

22-
import java.util.Locale;
22+
import javax.annotation.Nullable;
23+
24+
import java.io.Serializable;
25+
import java.util.Collections;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
import java.util.Objects;
2329

2430
/**
25-
* Aggregation function for aggregate merge engine.
31+
* Aggregation function with optional parameters for aggregate merge engine.
32+
*
33+
* <p>This class represents a parameterized aggregation function that can be applied to non-primary
34+
* key columns in aggregation merge engine tables. It encapsulates both the function type and
35+
* function-specific parameters (e.g., delimiter for LISTAGG).
36+
*
37+
* <p>Use {@link AggFunctions} utility class to create instances:
38+
*
39+
* <pre>{@code
40+
* AggFunction sumFunc = AggFunctions.SUM();
41+
* AggFunction listaggFunc = AggFunctions.LISTAGG(";");
42+
* }</pre>
2643
*
27-
* <p>This enum represents all supported aggregation functions that can be applied to non-primary
28-
* key columns in aggregation merge engine tables.
44+
* @since 0.9
2945
*/
3046
@PublicEvolving
31-
public enum AggFunction {
32-
// Numeric aggregation
33-
SUM("sum"),
34-
PRODUCT("product"),
35-
MAX("max"),
36-
MIN("min"),
47+
public final class AggFunction implements Serializable {
3748

38-
// Value selection
39-
LAST_VALUE("last_value"),
40-
LAST_VALUE_IGNORE_NULLS("last_value_ignore_nulls"),
41-
FIRST_VALUE("first_value"),
42-
FIRST_VALUE_IGNORE_NULLS("first_value_ignore_nulls"),
49+
private static final long serialVersionUID = 1L;
4350

44-
// String aggregation
45-
LISTAGG("listagg"),
46-
STRING_AGG("string_agg"), // Alias for LISTAGG - maps to same factory
51+
private final AggFunctionType type;
52+
private final Map<String, String> parameters;
4753

48-
// Boolean aggregation
49-
BOOL_AND("bool_and"),
50-
BOOL_OR("bool_or");
51-
52-
private final String identifier;
53-
54-
AggFunction(String identifier) {
55-
this.identifier = identifier;
54+
/**
55+
* Creates an aggregation function with the specified type and parameters.
56+
*
57+
* @param type the aggregation function type
58+
* @param parameters the function parameters (nullable)
59+
*/
60+
AggFunction(AggFunctionType type, @Nullable Map<String, String> parameters) {
61+
this.type = Objects.requireNonNull(type, "Aggregation function type must not be null");
62+
this.parameters =
63+
parameters == null || parameters.isEmpty()
64+
? Collections.emptyMap()
65+
: Collections.unmodifiableMap(new HashMap<>(parameters));
5666
}
5767

5868
/**
59-
* Returns the identifier string for this aggregation function.
69+
* Returns the aggregation function type.
6070
*
61-
* @return the identifier string
71+
* @return the function type
6272
*/
63-
public String getIdentifier() {
64-
return identifier;
73+
public AggFunctionType getType() {
74+
return type;
6575
}
6676

6777
/**
68-
* Converts a string to an AggFunction enum value.
69-
*
70-
* <p>This method supports multiple naming formats:
78+
* Returns the function parameters.
7179
*
72-
* <ul>
73-
* <li>Underscore format: "last_value_ignore_nulls"
74-
* <li>Hyphen format: "last-value-ignore-nulls"
75-
* <li>Case insensitive matching
76-
* </ul>
77-
*
78-
* <p>Note: For string_agg, this will return STRING_AGG enum, but the server-side factory will
79-
* map it to the same implementation as listagg.
80+
* @return an immutable map of parameters
81+
*/
82+
public Map<String, String> getParameters() {
83+
return parameters;
84+
}
85+
86+
/**
87+
* Gets a specific parameter value.
8088
*
81-
* @param name the aggregation function name
82-
* @return the AggFunction enum value, or null if not found
89+
* @param key the parameter key
90+
* @return the parameter value, or null if not found
8391
*/
84-
public static AggFunction fromString(String name) {
85-
if (name == null || name.trim().isEmpty()) {
86-
return null;
87-
}
92+
@Nullable
93+
public String getParameter(String key) {
94+
return parameters.get(key);
95+
}
8896

89-
// Normalize the input: convert hyphens to underscores and lowercase
90-
String normalized = name.replace('-', '_').toLowerCase(Locale.ROOT).trim();
97+
/**
98+
* Checks if this function has any parameters.
99+
*
100+
* @return true if parameters are present, false otherwise
101+
*/
102+
public boolean hasParameters() {
103+
return !parameters.isEmpty();
104+
}
91105

92-
// Try direct match with identifier
93-
for (AggFunction aggFunc : values()) {
94-
if (aggFunc.identifier.equals(normalized)) {
95-
return aggFunc;
96-
}
106+
@Override
107+
public boolean equals(Object o) {
108+
if (this == o) {
109+
return true;
97110
}
111+
if (o == null || getClass() != o.getClass()) {
112+
return false;
113+
}
114+
AggFunction that = (AggFunction) o;
115+
return type == that.type && parameters.equals(that.parameters);
116+
}
98117

99-
return null;
118+
@Override
119+
public int hashCode() {
120+
return Objects.hash(type, parameters);
100121
}
101122

102-
/**
103-
* Converts this AggFunction to its string identifier.
104-
*
105-
* @return the identifier string
106-
*/
107123
@Override
108124
public String toString() {
109-
return identifier;
125+
if (parameters.isEmpty()) {
126+
return type.getIdentifier();
127+
}
128+
return type.getIdentifier() + parameters;
110129
}
111130
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.metadata;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
import java.util.Locale;
23+
24+
/**
25+
* Aggregation function type for aggregate merge engine.
26+
*
27+
* <p>This enum represents all supported aggregation function types that can be applied to
28+
* non-primary key columns in aggregation merge engine tables.
29+
*/
30+
@PublicEvolving
31+
public enum AggFunctionType {
32+
// Numeric aggregation
33+
SUM("sum"),
34+
PRODUCT("product"),
35+
MAX("max"),
36+
MIN("min"),
37+
38+
// Value selection
39+
LAST_VALUE("last_value"),
40+
LAST_VALUE_IGNORE_NULLS("last_value_ignore_nulls"),
41+
FIRST_VALUE("first_value"),
42+
FIRST_VALUE_IGNORE_NULLS("first_value_ignore_nulls"),
43+
44+
// String aggregation
45+
LISTAGG("listagg"),
46+
STRING_AGG("string_agg"), // Alias for LISTAGG - maps to same factory
47+
48+
// Boolean aggregation
49+
BOOL_AND("bool_and"),
50+
BOOL_OR("bool_or");
51+
52+
private final String identifier;
53+
54+
AggFunctionType(String identifier) {
55+
this.identifier = identifier;
56+
}
57+
58+
/**
59+
* Returns the identifier string for this aggregation function type.
60+
*
61+
* @return the identifier string
62+
*/
63+
public String getIdentifier() {
64+
return identifier;
65+
}
66+
67+
/**
68+
* Converts a string to an AggFunctionType enum value.
69+
*
70+
* <p>This method supports multiple naming formats:
71+
*
72+
* <ul>
73+
* <li>Underscore format: "last_value_ignore_nulls"
74+
* <li>Hyphen format: "last-value-ignore-nulls"
75+
* <li>Case insensitive matching
76+
* </ul>
77+
*
78+
* <p>Note: For string_agg, this will return STRING_AGG enum, but the server-side factory will
79+
* map it to the same implementation as listagg.
80+
*
81+
* @param name the aggregation function type name
82+
* @return the AggFunctionType enum value, or null if not found
83+
*/
84+
public static AggFunctionType fromString(String name) {
85+
if (name == null || name.trim().isEmpty()) {
86+
return null;
87+
}
88+
89+
// Normalize the input: convert hyphens to underscores and lowercase
90+
String normalized = name.replace('-', '_').toLowerCase(Locale.ROOT).trim();
91+
92+
// Try direct match with identifier
93+
for (AggFunctionType aggFunc : values()) {
94+
if (aggFunc.identifier.equals(normalized)) {
95+
return aggFunc;
96+
}
97+
}
98+
99+
return null;
100+
}
101+
102+
/**
103+
* Converts this AggFunctionType to its string identifier.
104+
*
105+
* @return the identifier string
106+
*/
107+
@Override
108+
public String toString() {
109+
return identifier;
110+
}
111+
}

0 commit comments

Comments
 (0)