Skip to content

Commit 086a93d

Browse files
committed
fixup! Add Lance connector
1 parent 164a875 commit 086a93d

File tree

57 files changed

+665
-739
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+665
-739
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ jobs:
376376
!:trino-jdbc,
377377
!:trino-kafka,
378378
!:trino-lakehouse,
379+
!:trino-lance,
379380
!:trino-main,
380381
!:trino-mariadb,
381382
!:trino-memory,
@@ -499,6 +500,7 @@ jobs:
499500
- { modules: plugin/trino-ignite }
500501
- { modules: plugin/trino-kafka }
501502
- { modules: plugin/trino-lakehouse }
503+
- { modules: plugin/trino-lance }
502504
- { modules: plugin/trino-mariadb }
503505
- { modules: plugin/trino-mongodb }
504506
- { modules: plugin/trino-mysql }
@@ -922,6 +924,7 @@ jobs:
922924
- suite-storage-formats-detailed
923925
- suite-parquet
924926
- suite-oauth2
927+
- suite-lance
925928
- suite-ldap
926929
- suite-loki
927930
- suite-compatibility

core/trino-server/src/main/provisio/trino.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@
157157
</artifact>
158158
</artifactSet>
159159

160+
<artifactSet to="plugin/lance">
161+
<artifact id="${project.groupId}:trino-lance:zip:${project.version}">
162+
<unpack />
163+
</artifact>
164+
</artifactSet>
165+
160166
<artifactSet to="plugin/loki">
161167
<artifact id="${project.groupId}:trino-loki:zip:${project.version}">
162168
<unpack />

docs/src/main/sphinx/connector.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Ignite <connector/ignite>
2626
JMX <connector/jmx>
2727
Kafka <connector/kafka>
2828
Lakehouse <connector/lakehouse>
29+
Lance <connector/lance>
2930
Loki <connector/loki>
3031
MariaDB <connector/mariadb>
3132
Memory <connector/memory>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Lance connector
2+
3+
## General configuration
4+
5+
To configure the Lance connector, create a catalog properties file `etc/catalog/example.properties` with the following content
6+
7+
```text
8+
connector.name=lance
9+
```
10+
11+
You must configure a [namespace](https://lancedb.github.io/lance/format/namespace/) type.
12+
Currently only [directory namespace](https://lancedb.github.io/lance/format/namespace/impls/dir/) is supported.
13+
14+
```text
15+
lance.namespace.type=directory
16+
```
17+
18+
## Lance Namespace configuration
19+
### Directory namespace
20+
Lance directory namespace is a lightweight and simple single-level Lance namespace that contains only a list of tables. All tables reside in the default namespace.
21+
22+
The following configuration properties are available:
23+
24+
:::{list-table}
25+
:widths: 30, 10, 60
26+
:header-rows: 1
27+
28+
* - Property name
29+
- Required
30+
- Description
31+
* - `lance.namespace.directory.warehouse.location`
32+
- Yes
33+
- The root directory URI of the namespace where tables are stored.
34+
:::
35+
36+
37+
## File system access configuration
38+
39+
The connector supports accessing the following file systems:
40+
41+
* [](/object-storage/file-system-azure)
42+
* [](/object-storage/file-system-gcs)
43+
* [](/object-storage/file-system-s3)
44+
* [](/object-storage/file-system-hdfs)
45+
46+
You must enable and configure the specific file system access.

lib/trino-lance-file/pom.xml

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
<description>Trino - Lance file format</description>
1414

1515
<properties>
16+
<air.compiler.fail-warnings>true</air.compiler.fail-warnings>
1617
<air.test.jvm.additional-arguments>${air.test.jvm.additional-arguments.default}
1718
--add-opens=java.base/java.nio=ALL-UNNAMED
1819
--sun-misc-unsafe-memory-access=allow</air.test.jvm.additional-arguments>
@@ -24,11 +25,6 @@
2425
<artifactId>lancedb_lance_protocolbuffers_java</artifactId>
2526
</dependency>
2627

27-
<dependency>
28-
<groupId>com.fasterxml.jackson.core</groupId>
29-
<artifactId>jackson-annotations</artifactId>
30-
</dependency>
31-
3228
<dependency>
3329
<groupId>com.google.errorprone</groupId>
3430
<artifactId>error_prone_annotations</artifactId>
@@ -47,6 +43,7 @@
4743
<dependency>
4844
<groupId>io.github.luohao</groupId>
4945
<artifactId>fastlanes-java</artifactId>
46+
<version>1</version>
5047
</dependency>
5148

5249
<dependency>

lib/trino-lance-file/src/main/java/io/trino/lance/file/LanceReader.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.trino.lance.file.v2.metadata.Field;
2323
import io.trino.lance.file.v2.metadata.FileVersion;
2424
import io.trino.lance.file.v2.metadata.Footer;
25-
import io.trino.lance.file.v2.metadata.TypeUtil;
25+
import io.trino.lance.file.v2.metadata.LanceTypeUtil;
2626
import io.trino.lance.file.v2.reader.ColumnReader;
2727
import io.trino.lance.file.v2.reader.Range;
2828
import io.trino.memory.context.AggregatedMemoryContext;
@@ -82,7 +82,7 @@ public LanceReader(LanceDataSource dataSource,
8282
// read Global Buffer Offset Table
8383
Slice bufferOffsetTableSlice = dataSource.readFully(footer.getGlobalBuffOffsetStart(), footer.getNumGlobalBuffers() * BUFFER_DESCRIPTOR_SIZE);
8484
List<DiskRange> bufferOffsets = IntStream.range(0, footer.getNumGlobalBuffers()).boxed()
85-
.map(i -> DiskRange.of(bufferOffsetTableSlice.getLong(BUFFER_DESCRIPTOR_SIZE * i), bufferOffsetTableSlice.getLong(BUFFER_DESCRIPTOR_SIZE * i + 8)))
85+
.map(i -> new DiskRange(bufferOffsetTableSlice.getLong(BUFFER_DESCRIPTOR_SIZE * i), bufferOffsetTableSlice.getLong(BUFFER_DESCRIPTOR_SIZE * i + 8)))
8686
.collect(toImmutableList());
8787
if (bufferOffsets.size() == 0) {
8888
throw new RuntimeException("File did not contain any buffers");
@@ -91,40 +91,43 @@ public LanceReader(LanceDataSource dataSource,
9191
// read global schema
9292
DiskRange schemaBufferLocation = bufferOffsets.get(0);
9393
// prefetch all metadata
94-
Slice metadataSlice = dataSource.readTail(toIntExact(dataSource.getEstimatedSize() - schemaBufferLocation.getPosition()));
94+
Slice metadataSlice = dataSource.readTail(toIntExact(dataSource.getEstimatedSize() - schemaBufferLocation.position()));
9595
// read file descriptor
96-
Slice schemaSlice = metadataSlice.slice(0, toIntExact(schemaBufferLocation.getLength()));
96+
Slice schemaSlice = metadataSlice.slice(0, toIntExact(schemaBufferLocation.length()));
9797
build.buf.gen.lance.file.FileDescriptor fileDescriptor = build.buf.gen.lance.file.FileDescriptor.parseFrom(schemaSlice.toByteBuffer());
9898
checkArgument(fileDescriptor.hasSchema(), "FileDescriptor does not contain a schema");
9999
this.fields = toFields(fileDescriptor.getSchema());
100-
List<Range> ranges = requestRanges.orElse(ImmutableList.of(Range.of(0, fileDescriptor.getLength())));
100+
List<Range> ranges = requestRanges.orElse(ImmutableList.of(new Range(0, fileDescriptor.getLength())));
101101
this.numRows = ranges.stream()
102102
.mapToLong(Range::length)
103103
.sum();
104104
// read Column Metadata Offset Table
105-
Slice columnMetadataOffsetsSlice = metadataSlice.slice(toIntExact(footer.getColumnMetadataOffsetsStart() - schemaBufferLocation.getPosition()), toIntExact(footer.getGlobalBuffOffsetStart() - footer.getColumnMetadataOffsetsStart()));
105+
Slice columnMetadataOffsetsSlice = metadataSlice.slice(toIntExact(footer.getColumnMetadataOffsetsStart() - schemaBufferLocation.position()), toIntExact(footer.getGlobalBuffOffsetStart() - footer.getColumnMetadataOffsetsStart()));
106106
List<DiskRange> columnMetadataOffsets = IntStream.range(0, footer.getNumColumns()).boxed()
107-
.map(i -> DiskRange.of(columnMetadataOffsetsSlice.getLong(i * BUFFER_DESCRIPTOR_SIZE), columnMetadataOffsetsSlice.getLong(i * BUFFER_DESCRIPTOR_SIZE + 8)))
107+
.map(i -> {
108+
long position = columnMetadataOffsetsSlice.getLong(i * BUFFER_DESCRIPTOR_SIZE);
109+
return new DiskRange(position, columnMetadataOffsetsSlice.getLong(i * BUFFER_DESCRIPTOR_SIZE + 8));
110+
})
108111
.collect(toImmutableList());
109112

110113
// read Column Metadata
111-
Slice columnMetadataSlice = metadataSlice.slice(toIntExact(footer.getColumnMetadataStart() - schemaBufferLocation.getPosition()), toIntExact(footer.getColumnMetadataOffsetsStart() - footer.getColumnMetadataStart()));
114+
Slice columnMetadataSlice = metadataSlice.slice(toIntExact(footer.getColumnMetadataStart() - schemaBufferLocation.position()), toIntExact(footer.getColumnMetadataOffsetsStart() - footer.getColumnMetadataStart()));
112115
List<ColumnMetadata> metadata = IntStream.range(0, footer.getNumColumns()).boxed()
113116
.map(i -> {
114117
DiskRange offset = columnMetadataOffsets.get(i);
115-
Slice message = columnMetadataSlice.slice(toIntExact(offset.getPosition() - footer.getColumnMetadataStart()), toIntExact(offset.getLength()));
118+
Slice message = columnMetadataSlice.slice(toIntExact(offset.position() - footer.getColumnMetadataStart()), toIntExact(offset.length()));
116119
return ColumnMetadata.from(i, message);
117120
})
118121
.collect(toImmutableList());
119122

120-
Map<Integer, Integer> fieldIdMap = TypeUtil.visit(fields, new TypeUtil.FieldIdToColumnIndexVisitor());
123+
Map<Integer, Integer> fieldIdMap = LanceTypeUtil.visit(fields, new LanceTypeUtil.FieldIdToColumnIndexVisitor());
121124
this.columnMetadata = fieldIdMap.entrySet().stream()
122125
.collect(toImmutableMap(
123126
Map.Entry::getKey,
124127
entry -> metadata.get(entry.getValue())));
125128

126129
this.columnReaders = fields.stream()
127-
.filter(field -> columnIds.contains(field.getId()))
130+
.filter(field -> columnIds.contains(field.id()))
128131
.map(field ->
129132
ColumnReader.createColumnReader(
130133
dataSource,
@@ -149,7 +152,7 @@ public static List<Field> toFields(List<build.buf.gen.lance.file.Field> fieldsPr
149152
}
150153
List<Field> fields = new ArrayList<>();
151154
for (Map.Entry<Integer, Field> entry : fieldMap.entrySet()) {
152-
int parentId = fieldMap.get(entry.getKey()).getParentId();
155+
int parentId = fieldMap.get(entry.getKey()).parentId();
153156
Field field = entry.getValue();
154157
if (parentId == -1) {
155158
fields.add(field);
@@ -299,7 +302,7 @@ public Block getBlock(int channel)
299302

300303
Block block = blocks[channel];
301304
if (block == null) {
302-
block = columnReaders[channel].read().getBlock();
305+
block = columnReaders[channel].read().block();
303306
block = selectedPositions.apply(block);
304307
}
305308
blocks[channel] = block;

lib/trino-lance-file/src/main/java/io/trino/lance/file/MemoryLanceDataSource.java

Lines changed: 0 additions & 88 deletions
This file was deleted.

lib/trino-lance-file/src/main/java/io/trino/lance/file/v2/metadata/AllNullLayout.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,16 @@
1313
*/
1414
package io.trino.lance.file.v2.metadata;
1515

16-
import java.util.List;
16+
import com.google.common.collect.ImmutableList;
1717

18-
import static java.util.Objects.requireNonNull;
18+
import java.util.List;
1919

20-
public final class AllNullLayout
20+
public record AllNullLayout(List<RepDefLayer> layers)
2121
implements PageLayout
2222
{
23-
public final List<RepDefLayer> layers;
24-
25-
public AllNullLayout(List<RepDefLayer> layers)
23+
public AllNullLayout
2624
{
27-
this.layers = requireNonNull(layers, "layers is null");
25+
layers = ImmutableList.copyOf(layers);
2826
}
2927

3028
public static AllNullLayout fromProto(build.buf.gen.lance.encodings21.AllNullLayout proto)

lib/trino-lance-file/src/main/java/io/trino/lance/file/v2/metadata/ColumnMetadata.java

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,13 @@
2323

2424
import static com.google.common.base.Preconditions.checkArgument;
2525
import static com.google.common.collect.ImmutableList.toImmutableList;
26-
import static java.util.Objects.requireNonNull;
2726

28-
public class ColumnMetadata
27+
public record ColumnMetadata(int index, List<PageMetadata> pages, List<DiskRange> bufferOffsets)
2928
{
30-
private final int index;
31-
private final List<PageMetadata> pages;
32-
private final List<DiskRange> bufferOffsets;
33-
34-
public ColumnMetadata(int index, List<PageMetadata> pages, List<DiskRange> bufferOffsets)
29+
public ColumnMetadata
3530
{
36-
this.index = index;
37-
this.pages = requireNonNull(pages, "pages is null");
38-
this.bufferOffsets = requireNonNull(bufferOffsets, "bufferOffsets is null");
31+
pages = ImmutableList.copyOf(pages);
32+
bufferOffsets = ImmutableList.copyOf(bufferOffsets);
3933
}
4034

4135
public static ColumnMetadata from(int columnIndex, Slice data)
@@ -56,15 +50,21 @@ public static ColumnMetadata from(int columnIndex, Slice data)
5650
long priority = page.getPriority();
5751
int bufferCount = page.getBufferOffsetsList().size();
5852
List<DiskRange> buffers = IntStream.range(0, bufferCount).boxed()
59-
.map(i -> DiskRange.of(page.getBufferOffsets(i), page.getBufferSizes(i)))
53+
.map(i -> {
54+
long position = page.getBufferOffsets(i);
55+
return new DiskRange(position, page.getBufferSizes(i));
56+
})
6057
.collect(toImmutableList());
6158
return new PageMetadata(numRows, priority, getPageLayout(page), buffers);
6259
})
6360
.collect(toImmutableList());
6461

6562
int bufferCount = proto.getBufferOffsetsList().size();
66-
ImmutableList<DiskRange> buffers = IntStream.range(0, bufferCount).boxed()
67-
.map(index -> DiskRange.of(proto.getBufferOffsets(index), proto.getBufferSizes(index)))
63+
List<DiskRange> buffers = IntStream.range(0, bufferCount).boxed()
64+
.map(index -> {
65+
long position = proto.getBufferOffsets(index);
66+
return new DiskRange(position, proto.getBufferSizes(index));
67+
})
6868
.collect(toImmutableList());
6969
return new ColumnMetadata(columnIndex, pages, buffers);
7070
}
@@ -88,19 +88,4 @@ private static PageLayout getPageLayout(build.buf.gen.lance.file.v2.ColumnMetada
8888
default -> throw new UnsupportedOperationException("Invalid encoding: " + encoding);
8989
};
9090
}
91-
92-
public int getIndex()
93-
{
94-
return index;
95-
}
96-
97-
public List<PageMetadata> getPages()
98-
{
99-
return pages;
100-
}
101-
102-
public List<DiskRange> getBufferOffsets()
103-
{
104-
return bufferOffsets;
105-
}
10691
}

0 commit comments

Comments
 (0)