Skip to content

Commit 60f66d8

Browse files
rzeyde-varadamartint
authored andcommitted
Implement Count and Histogram metrics in trino-plugin-toolkit
Also, add an integration test to TestMemorySmoke
1 parent 38305b7 commit 60f66d8

File tree

7 files changed

+329
-0
lines changed

7 files changed

+329
-0
lines changed

lib/trino-plugin-toolkit/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
<artifactId>slice</artifactId>
4949
</dependency>
5050

51+
<dependency>
52+
<groupId>io.airlift</groupId>
53+
<artifactId>stats</artifactId>
54+
</dependency>
55+
5156
<dependency>
5257
<groupId>io.airlift</groupId>
5358
<artifactId>units</artifactId>
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.base.metrics;
15+
16+
import com.fasterxml.jackson.annotation.JsonCreator;
17+
import com.fasterxml.jackson.annotation.JsonProperty;
18+
import io.trino.spi.metrics.Count;
19+
20+
import java.util.Objects;
21+
22+
import static com.google.common.base.MoreObjects.toStringHelper;
23+
24+
public class LongCount
25+
implements Count<LongCount>
26+
{
27+
private final long total;
28+
29+
@JsonCreator
30+
public LongCount(long total)
31+
{
32+
this.total = total;
33+
}
34+
35+
@JsonProperty("total")
36+
@Override
37+
public long getTotal()
38+
{
39+
return total;
40+
}
41+
42+
@Override
43+
public LongCount mergeWith(LongCount other)
44+
{
45+
return new LongCount(total + other.getTotal());
46+
}
47+
48+
@Override
49+
public boolean equals(Object o)
50+
{
51+
if (this == o) {
52+
return true;
53+
}
54+
if (o == null || getClass() != o.getClass()) {
55+
return false;
56+
}
57+
LongCount count = (LongCount) o;
58+
return total == count.total;
59+
}
60+
61+
@Override
62+
public int hashCode()
63+
{
64+
return Objects.hash(total);
65+
}
66+
67+
@Override
68+
public String toString()
69+
{
70+
return toStringHelper(this)
71+
.add("total", total)
72+
.toString();
73+
}
74+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package io.trino.plugin.base.metrics;
16+
17+
import com.fasterxml.jackson.annotation.JsonCreator;
18+
import com.fasterxml.jackson.annotation.JsonProperty;
19+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
20+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
21+
import com.fasterxml.jackson.databind.util.StdConverter;
22+
import io.airlift.slice.Slice;
23+
import io.airlift.stats.TDigest;
24+
import io.trino.spi.metrics.Distribution;
25+
26+
import java.util.Base64;
27+
28+
import static io.airlift.slice.Slices.wrappedBuffer;
29+
30+
public class TDigestHistogram
31+
implements Distribution<TDigestHistogram>
32+
{
33+
@JsonSerialize(converter = TDigestToBase64Converter.class)
34+
@JsonDeserialize(converter = Base64ToTDigestConverter.class)
35+
private final TDigest digest;
36+
37+
@JsonCreator
38+
public TDigestHistogram(TDigest digest)
39+
{
40+
this.digest = digest;
41+
}
42+
43+
@JsonProperty
44+
public TDigest getDigest()
45+
{
46+
return digest;
47+
}
48+
49+
@Override
50+
public TDigestHistogram mergeWith(TDigestHistogram other)
51+
{
52+
TDigest result = TDigest.copyOf(digest);
53+
result.mergeWith(other.getDigest());
54+
return new TDigestHistogram(result);
55+
}
56+
57+
@Override
58+
public long getTotal()
59+
{
60+
return (long) digest.getCount();
61+
}
62+
63+
@Override
64+
public double getPercentile(double percentile)
65+
{
66+
return digest.valueAt(percentile / 100.0);
67+
}
68+
69+
public static class TDigestToBase64Converter
70+
extends StdConverter<TDigest, String>
71+
{
72+
public TDigestToBase64Converter()
73+
{
74+
}
75+
76+
@Override
77+
public String convert(TDigest value)
78+
{
79+
Slice slice = value.serialize();
80+
return Base64.getEncoder().encodeToString(slice.getBytes());
81+
}
82+
}
83+
84+
public static class Base64ToTDigestConverter
85+
extends StdConverter<String, TDigest>
86+
{
87+
public Base64ToTDigestConverter()
88+
{
89+
}
90+
91+
@Override
92+
public TDigest convert(String value)
93+
{
94+
Slice slice = wrappedBuffer(Base64.getDecoder().decode(value));
95+
return TDigest.deserialize(slice);
96+
}
97+
}
98+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package io.trino.plugin.base.metrics;
16+
17+
import com.google.common.collect.ImmutableMap;
18+
import io.airlift.json.JsonCodec;
19+
import io.airlift.stats.TDigest;
20+
import io.trino.spi.metrics.Metric;
21+
import io.trino.spi.metrics.Metrics;
22+
import org.testng.annotations.Test;
23+
24+
import java.util.Arrays;
25+
import java.util.Map;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
29+
public class TestMetrics
30+
{
31+
@Test
32+
public void testMergeCount()
33+
{
34+
Metrics m1 = new Metrics(ImmutableMap.of(
35+
"a", new LongCount(1),
36+
"b", new LongCount(2)));
37+
Metrics m2 = new Metrics(ImmutableMap.of(
38+
"b", new LongCount(3),
39+
"c", new LongCount(4)));
40+
Metrics merged = merge(m1, m2);
41+
Map<String, Metric> expectedMap = ImmutableMap.of(
42+
"a", new LongCount(1),
43+
"b", new LongCount(5),
44+
"c", new LongCount(4));
45+
assertThat(merged.getMetrics()).isEqualTo(expectedMap);
46+
}
47+
48+
@Test
49+
public void testMergeHistogram()
50+
{
51+
TDigest d1 = new TDigest();
52+
d1.add(10.0, 1);
53+
54+
TDigest d2 = new TDigest();
55+
d2.add(5.0, 2);
56+
57+
Metrics m1 = new Metrics(ImmutableMap.of("a", new TDigestHistogram(d1)));
58+
Metrics m2 = new Metrics(ImmutableMap.of("a", new TDigestHistogram(d2)));
59+
TDigestHistogram merged = (TDigestHistogram) merge(m1, m2).getMetrics().get("a");
60+
61+
assertThat(merged.getTotal()).isEqualTo(3L);
62+
assertThat(merged.getPercentile(0)).isEqualTo(5.0);
63+
assertThat(merged.getPercentile(100)).isEqualTo(10.0);
64+
}
65+
66+
@Test
67+
public void testHistogramJson()
68+
{
69+
JsonCodec<TDigestHistogram> codec = JsonCodec.jsonCodec(TDigestHistogram.class);
70+
71+
TDigest digest = new TDigest();
72+
digest.add(123);
73+
74+
String json = codec.toJson(new TDigestHistogram(digest));
75+
TDigestHistogram result = codec.fromJson(json);
76+
assertThat(result.getDigest().getCount()).isEqualTo(digest.getCount());
77+
}
78+
79+
@Test(expectedExceptions = ClassCastException.class)
80+
public void testFailIncompatibleTypes()
81+
{
82+
Metrics m1 = new Metrics(ImmutableMap.of("a", new TDigestHistogram(new TDigest())));
83+
Metrics m2 = new Metrics(ImmutableMap.of("a", new LongCount(0)));
84+
merge(m1, m2);
85+
}
86+
87+
private static Metrics merge(Metrics... metrics)
88+
{
89+
return Arrays.stream(metrics).reduce(Metrics.EMPTY, Metrics::mergeWith);
90+
}
91+
}

plugin/trino-memory/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
</properties>
1919

2020
<dependencies>
21+
<dependency>
22+
<groupId>io.trino</groupId>
23+
<artifactId>trino-plugin-toolkit</artifactId>
24+
</dependency>
25+
2126
<dependency>
2227
<groupId>io.airlift</groupId>
2328
<artifactId>bootstrap</artifactId>

plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSourceProvider.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414
package io.trino.plugin.memory;
1515

16+
import com.google.common.collect.ImmutableMap;
17+
import io.trino.plugin.base.metrics.LongCount;
1618
import io.trino.spi.Page;
1719
import io.trino.spi.connector.ColumnHandle;
1820
import io.trino.spi.connector.ConnectorPageSource;
@@ -23,6 +25,7 @@
2325
import io.trino.spi.connector.ConnectorTransactionHandle;
2426
import io.trino.spi.connector.DynamicFilter;
2527
import io.trino.spi.connector.FixedPageSource;
28+
import io.trino.spi.metrics.Metrics;
2629
import io.trino.spi.predicate.Domain;
2730
import io.trino.spi.predicate.TupleDomain;
2831
import io.trino.spi.type.TypeUtils;
@@ -89,6 +92,7 @@ private static class DynamicFilteringPageSource
8992
private final List<ColumnHandle> columns;
9093
private final DynamicFilter dynamicFilter;
9194
private final boolean enableLazyDynamicFiltering;
95+
private long rows;
9296

9397
private DynamicFilteringPageSource(FixedPageSource delegate, List<ColumnHandle> columns, DynamicFilter dynamicFilter, boolean enableLazyDynamicFiltering)
9498
{
@@ -131,6 +135,9 @@ public Page getNextPage()
131135
if (page != null && !predicate.isAll()) {
132136
page = applyFilter(page, predicate.transformKeys(columns::indexOf).getDomains().get());
133137
}
138+
if (page != null) {
139+
rows += page.getPositionCount();
140+
}
134141
return page;
135142
}
136143

@@ -154,6 +161,15 @@ public void close()
154161
{
155162
delegate.close();
156163
}
164+
165+
@Override
166+
public Metrics getMetrics()
167+
{
168+
return new Metrics(ImmutableMap.of(
169+
"rows", new LongCount(rows),
170+
"finished", new LongCount(isFinished() ? 1 : 0),
171+
"started", new LongCount(1)));
172+
}
157173
}
158174

159175
private static Page applyFilter(Page page, Map<Integer, Domain> domains)

0 commit comments

Comments
 (0)