21
21
import java .io .IOException ;
22
22
import java .io .UncheckedIOException ;
23
23
import java .util .Map ;
24
+ import java .util .function .Supplier ;
24
25
import org .apache .iceberg .FileFormat ;
25
26
import org .apache .iceberg .MetricsConfig ;
26
27
import org .apache .iceberg .PartitionSpec ;
@@ -53,14 +54,16 @@ public class GenericAppenderFactory implements FileAppenderFactory<Record> {
53
54
private final Schema posDeleteRowSchema ;
54
55
private final Map <String , String > config = Maps .newHashMap ();
55
56
57
+ private static final String WRITE_METRICS_PREFIX = "write.metadata.metrics." ;
58
+
56
59
@ Deprecated
57
60
public GenericAppenderFactory (Schema schema ) {
58
- this (null , schema , PartitionSpec .unpartitioned (), null , null , null );
61
+ this (schema , PartitionSpec .unpartitioned ());
59
62
}
60
63
61
64
@ Deprecated
62
65
public GenericAppenderFactory (Schema schema , PartitionSpec spec ) {
63
- this (null , schema , spec , null , null , null );
66
+ this (schema , spec , null , null , null );
64
67
}
65
68
66
69
@ Deprecated
@@ -70,17 +73,18 @@ public GenericAppenderFactory(
70
73
int [] equalityFieldIds ,
71
74
Schema eqDeleteRowSchema ,
72
75
Schema posDeleteRowSchema ) {
73
- this (null , schema , spec , equalityFieldIds , eqDeleteRowSchema , posDeleteRowSchema );
76
+ this (null , schema , spec , null , equalityFieldIds , eqDeleteRowSchema , posDeleteRowSchema );
74
77
}
75
78
76
79
public GenericAppenderFactory (Table table ) {
77
- this (table , null , null , null , null , null );
80
+ this (table , null , null , null , null , null , null );
78
81
}
79
82
80
83
public GenericAppenderFactory (
81
84
Table table ,
82
85
Schema schema ,
83
86
PartitionSpec spec ,
87
+ Map <String , String > config ,
84
88
int [] equalityFieldIds ,
85
89
Schema eqDeleteRowSchema ,
86
90
Schema posDeleteRowSchema ) {
@@ -97,17 +101,33 @@ public GenericAppenderFactory(
97
101
this .spec = spec ;
98
102
}
99
103
104
+ if (config != null ) {
105
+ this .config .putAll (config );
106
+ }
107
+
100
108
this .equalityFieldIds = equalityFieldIds ;
101
109
this .eqDeleteRowSchema = eqDeleteRowSchema ;
102
110
this .posDeleteRowSchema = posDeleteRowSchema ;
103
111
}
104
112
105
113
public GenericAppenderFactory set (String property , String value ) {
114
+ if (property .startsWith (WRITE_METRICS_PREFIX ) && table != null ) {
115
+ throw new IllegalArgumentException (
116
+ String .format (
117
+ "Cannot set metrics property: %s directly. Use table properties instead." , property ));
118
+ }
119
+
106
120
config .put (property , value );
107
121
return this ;
108
122
}
109
123
110
124
public GenericAppenderFactory setAll (Map <String , String > properties ) {
125
+ if (properties .keySet ().stream ().anyMatch (k -> k .startsWith (WRITE_METRICS_PREFIX ))
126
+ && table != null ) {
127
+ throw new IllegalArgumentException (
128
+ "Cannot set metrics properties directly. Use table properties instead." );
129
+ }
130
+
111
131
config .putAll (properties );
112
132
return this ;
113
133
}
@@ -120,7 +140,7 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
120
140
@ Override
121
141
public FileAppender <Record > newAppender (
122
142
EncryptedOutputFile encryptedOutputFile , FileFormat fileFormat ) {
123
- MetricsConfig metricsConfig = metricsConfig ( );
143
+ MetricsConfig metricsConfig = applyMetricsConfig (() -> MetricsConfig . forTable ( table ) );
124
144
125
145
try {
126
146
switch (fileFormat ) {
@@ -181,7 +201,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
181
201
Preconditions .checkNotNull (
182
202
eqDeleteRowSchema ,
183
203
"Equality delete row schema shouldn't be null when creating equality-delete writer" );
184
- MetricsConfig metricsConfig = metricsConfig ( );
204
+ MetricsConfig metricsConfig = applyMetricsConfig (() -> MetricsConfig . forTable ( table ) );
185
205
186
206
try {
187
207
switch (format ) {
@@ -235,7 +255,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
235
255
@ Override
236
256
public PositionDeleteWriter <Record > newPosDeleteWriter (
237
257
EncryptedOutputFile file , FileFormat format , StructLike partition ) {
238
- MetricsConfig metricsConfig = metricsConfig ( );
258
+ MetricsConfig metricsConfig = applyMetricsConfig (() -> MetricsConfig . forPositionDelete ( table ) );
239
259
240
260
try {
241
261
switch (format ) {
@@ -282,12 +302,12 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
282
302
}
283
303
}
284
304
285
- private MetricsConfig metricsConfig ( ) {
305
+ private MetricsConfig applyMetricsConfig ( Supplier < MetricsConfig > metricsConfigSupplier ) {
286
306
MetricsConfig metricsConfig ;
287
307
if (table == null ) {
288
308
metricsConfig = MetricsConfig .fromProperties (config );
289
309
} else {
290
- metricsConfig = MetricsConfig . forTable ( table );
310
+ metricsConfig = metricsConfigSupplier . get ( );
291
311
}
292
312
293
313
return metricsConfig ;
0 commit comments