Skip to content

Commit 4e80bf3

Browse files
committed
test
1 parent 343a4a2 commit 4e80bf3

File tree

4 files changed

+68
-4
lines changed

4 files changed

+68
-4
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,7 +1005,7 @@ impl Header for BodyFormat {
10051005
{
10061006
if let Some(v) = values.next() {
10071007
match v.to_str() {
1008-
Ok("application/vnd.apache.arrow.file") => return Ok(BodyFormat::Arrow),
1008+
Ok(s) if s == BodyFormat::Arrow.content_type() => return Ok(BodyFormat::Arrow),
10091009
Err(_) => return Err(headers::Error::invalid()),
10101010
_ => {}
10111011
};
@@ -1019,7 +1019,7 @@ impl Header for BodyFormat {
10191019
}
10201020

10211021
impl BodyFormat {
1022-
pub fn content_type(&self) -> &'static str {
1022+
pub const fn content_type(&self) -> &'static str {
10231023
match self {
10241024
BodyFormat::Json => "application/json",
10251025
BodyFormat::Arrow => "application/vnd.apache.arrow.stream",

src/query/service/src/servers/http/v1/query/blocks_serializer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
use std::cell::RefCell;
1616
use std::ops::DerefMut;
1717

18+
use arrow_ipc::writer::IpcWriteOptions;
1819
use arrow_ipc::writer::StreamWriter;
20+
use arrow_ipc::CompressionType;
21+
use arrow_ipc::MetadataVersion;
1922
use databend_common_exception::Result;
2023
use databend_common_expression::types::date::date_to_string;
2124
use databend_common_expression::types::interval::interval_to_string;
@@ -105,7 +108,9 @@ impl BlocksSerializer {
105108

106109
pub fn to_arrow_ipc(&self, schema: arrow_schema::Schema) -> Result<Vec<u8>> {
107110
let mut buf = Vec::new();
108-
let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
111+
let opts = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?
112+
.try_with_compression(Some(CompressionType::LZ4_FRAME))?;
113+
let mut writer = StreamWriter::try_new_with_options(&mut buf, &schema, opts)?;
109114

110115
let mut data_schema = None;
111116
for (block, _) in &self.columns {

tests/nox/noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def run_jdbc_test(session, driver_version, main_version):
6161

6262
@nox.session
6363
def test_suites(session):
64-
session.install("pytest", "requests", "pytest-asyncio")
64+
session.install("pytest", "requests", "pytest-asyncio", "pyarrow")
6565
# Usage: nox -s test_suites -- suites/1_stateful/09_http_handler/test_09_0007_session.py::test_session
6666
session.run("pytest", *session.posargs)
6767

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import json
2+
3+
import requests
4+
import pyarrow.ipc as ipc
5+
6+
auth = ("root", "")
7+
8+
9+
def do_query(query, session, pagination):
10+
url = f"http://localhost:8000/v1/query"
11+
payload = {
12+
"sql": query,
13+
}
14+
if session:
15+
payload["session"] = session
16+
if pagination:
17+
payload["pagination"] = pagination
18+
headers = {
19+
"Accept": "application/vnd.apache.arrow.stream",
20+
"Content-Type": "application/json",
21+
}
22+
23+
return requests.post(url, headers=headers, json=payload, auth=auth)
24+
25+
26+
def test_arrow_ipc():
27+
pagination = {
28+
"max_rows_per_page": 20,
29+
}
30+
resp = do_query("select * from numbers(97)", session=None, pagination=pagination)
31+
32+
# print("content", len(resp.content))
33+
# IpcWriteOptions(alignment 64 compression None) content: 1672
34+
# IpcWriteOptions(alignment 8 compression lz4) content: 1448
35+
36+
rows = 0
37+
with ipc.open_stream(resp.content) as reader:
38+
header = json.loads(reader.schema.metadata[b"response_header"])
39+
assert header["error"] == None
40+
for batch in reader:
41+
rows += batch.num_rows
42+
43+
for _ in range(30):
44+
if header.get("next_uri") == None:
45+
break
46+
47+
uri = f"http://localhost:8000/{header['next_uri']}"
48+
resp = requests.get(
49+
uri, auth=auth, headers={"Accept": "application/vnd.apache.arrow.stream"}
50+
)
51+
with ipc.open_stream(resp.content) as reader:
52+
header = json.loads(reader.schema.metadata[b"response_header"])
53+
assert header["error"] == None
54+
for batch in reader:
55+
rows += batch.num_rows
56+
if rows < 96:
57+
assert batch.num_rows == 20
58+
59+
assert rows == 97

0 commit comments

Comments
 (0)