Skip to content

Commit

Permalink
new: Propagate timeout to grpc default options
Browse files Browse the repository at this point in the history
new: Added missing timeouts
  • Loading branch information
hh-space-invader committed Nov 13, 2024
1 parent f8cd2d1 commit bc85c04
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 30 deletions.
32 changes: 17 additions & 15 deletions qdrant_client/async_qdrant_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ def __init__(
self._rest_args["limits"] = limits
if self._timeout is not None:
self._rest_args["timeout"] = self._timeout
if self._grpc_options:
self._grpc_options["timeout"] = self._timeout
else:
self._grpc_options = {"timeout": self._timeout}
if self._auth_token_provider is not None:
if self._scheme == "http":
warnings.warn("Auth token provider is used with an insecure connection.")
Expand Down Expand Up @@ -393,7 +397,7 @@ async def search(
sparse_indices=sparse_indices,
shard_key_selector=shard_key_selector,
),
timeout=timeout if timeout is None else self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
return [GrpcToRest.convert_scored_point(hit) for hit in res.result]
else:
Expand Down Expand Up @@ -436,8 +440,6 @@ async def query_points(
types.Query,
types.NumpyArray,
types.Document,
types.Image,
types.InferenceObject,
None,
] = None,
using: Optional[str] = None,
Expand Down Expand Up @@ -497,7 +499,7 @@ async def query_points(
shard_key_selector=shard_key_selector,
read_consistency=consistency,
),
timeout=timeout if timeout is None else self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
scored_points = [GrpcToRest.convert_scored_point(hit) for hit in res.result]
return models.QueryResponse(points=scored_points)
Expand Down Expand Up @@ -605,8 +607,6 @@ async def query_points_groups(
types.Query,
types.NumpyArray,
types.Document,
types.Image,
types.InferenceObject,
None,
] = None,
using: Optional[str] = None,
Expand Down Expand Up @@ -674,7 +674,7 @@ async def query_points_groups(
shard_key_selector=shard_key_selector,
read_consistency=consistency,
),
timeout=timeout if timeout is None else self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
).result
return GrpcToRest.convert_groups_result(result)
Expand Down Expand Up @@ -1429,7 +1429,7 @@ async def scroll(
shard_key_selector=shard_key_selector,
timeout=timeout,
),
timeout=timeout if timeout is None else self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
return (
[GrpcToRest.convert_retrieved_point(point) for point in res.result],
Expand Down Expand Up @@ -1488,7 +1488,7 @@ async def count(
shard_key_selector=shard_key_selector,
timeout=timeout,
),
timeout=timeout if timeout is None else self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
).result
return GrpcToRest.convert_count_result(response)
Expand Down Expand Up @@ -1755,7 +1755,7 @@ async def retrieve(
shard_key_selector=shard_key_selector,
timeout=timeout,
),
timeout=timeout if timeout is None else self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
).result
assert result is not None, "Retrieve returned None result"
Expand Down Expand Up @@ -2188,7 +2188,7 @@ async def update_collection_aliases(
return (
await self.grpc_collections.UpdateAliases(
grpc.ChangeAliases(timeout=timeout, actions=change_aliases_operation),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
).result
change_aliases_operation = [
Expand Down Expand Up @@ -2336,7 +2336,7 @@ async def update_collection(
quantization_config=quantization_config,
sparse_vectors_config=sparse_vectors_config,
),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
).result
if isinstance(optimizers_config, grpc.OptimizersConfigDiff):
Expand Down Expand Up @@ -2372,7 +2372,8 @@ async def delete_collection(
if self._prefer_grpc:
return (
await self.grpc_collections.Delete(
grpc.DeleteCollection(collection_name=collection_name), timeout=self._timeout
grpc.DeleteCollection(collection_name=collection_name),
timeout=timeout if timeout is not None else self._timeout,
)
).result
result: Optional[bool] = (
Expand Down Expand Up @@ -2543,6 +2544,7 @@ def _upload_collection(
"metadata": self._grpc_headers,
"wait": wait,
"shard_key_selector": shard_key_selector,
"options": self._grpc_options,
}
else:
updater_kwargs = {
Expand Down Expand Up @@ -2948,7 +2950,7 @@ async def create_shard_key(
placement=placement or [],
),
),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
).result
else:
Expand Down Expand Up @@ -2984,7 +2986,7 @@ async def delete_shard_key(
timeout=timeout,
request=grpc.DeleteShardKey(shard_key=shard_key),
),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
)
).result
else:
Expand Down
4 changes: 2 additions & 2 deletions qdrant_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,18 @@ async def intercept_call(


def parse_channel_options(options: Optional[Dict[str, Any]] = None) -> List[Tuple[str, Any]]:
timeout = f'{str(options.copy().pop("timeout", "10"))}s' if options else "10s"
default_options: List[Tuple[str, Any]] = [
("grpc.max_send_message_length", -1),
("grpc.max_receive_message_length", -1),
("grpc.enable_retries", 1),
(
"grpc.service_config",
json.dumps(
{
"methodConfig": [
{
"name": [{}],
"timeout": "10s", # TODO: propagate timeout
"timeout": timeout,
},
]
}
Expand Down
4 changes: 0 additions & 4 deletions qdrant_client/qdrant_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1545,7 +1545,6 @@ def upsert(
wait: bool = True,
ordering: Optional[types.WriteOrdering] = None,
shard_key_selector: Optional[types.ShardKeySelector] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> types.UpdateResult:
"""
Expand Down Expand Up @@ -1603,7 +1602,6 @@ def upsert(
wait=wait,
ordering=ordering,
shard_key_selector=shard_key_selector,
timeout=timeout,
**kwargs,
)

Expand Down Expand Up @@ -1774,7 +1772,6 @@ def delete(
wait: bool = True,
ordering: Optional[types.WriteOrdering] = None,
shard_key_selector: Optional[types.ShardKeySelector] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> types.UpdateResult:
"""Deletes selected points from collection
Expand Down Expand Up @@ -1812,7 +1809,6 @@ def delete(
wait=wait,
ordering=ordering,
shard_key_selector=shard_key_selector,
timeout=timeout,
**kwargs,
)

Expand Down
21 changes: 12 additions & 9 deletions qdrant_client/qdrant_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ def __init__(

if self._timeout is not None:
self._rest_args["timeout"] = self._timeout
if self._grpc_options:
self._grpc_options["timeout"] = self._timeout
else:
self._grpc_options = {"timeout": self._timeout}

if self._auth_token_provider is not None:
if self._scheme == "http":
Expand Down Expand Up @@ -1879,7 +1883,6 @@ def upsert(
wait: bool = True,
ordering: Optional[types.WriteOrdering] = None,
shard_key_selector: Optional[types.ShardKeySelector] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> types.UpdateResult:
if self._prefer_grpc:
Expand Down Expand Up @@ -1923,7 +1926,7 @@ def upsert(
ordering=ordering,
shard_key_selector=shard_key_selector,
),
timeout=timeout if timeout is not None else self._timeout,
timeout=self._timeout,
).result

assert grpc_result is not None, "Upsert returned None result"
Expand Down Expand Up @@ -2237,7 +2240,6 @@ def delete(
wait: bool = True,
ordering: Optional[types.WriteOrdering] = None,
shard_key_selector: Optional[types.ShardKeySelector] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> types.UpdateResult:
if self._prefer_grpc:
Expand All @@ -2261,7 +2263,7 @@ def delete(
ordering=ordering,
shard_key_selector=shard_key_selector,
),
timeout=timeout if timeout is not None else self._timeout,
timeout=self._timeout,
).result
)
else:
Expand Down Expand Up @@ -2534,7 +2536,7 @@ def update_collection_aliases(
timeout=timeout,
actions=change_aliases_operation,
),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
).result

change_aliases_operation = [
Expand Down Expand Up @@ -2680,7 +2682,7 @@ def update_collection(
quantization_config=quantization_config,
sparse_vectors_config=sparse_vectors_config,
),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
).result

if isinstance(optimizers_config, grpc.OptimizersConfigDiff):
Expand Down Expand Up @@ -2719,7 +2721,7 @@ def delete_collection(
if self._prefer_grpc:
return self.grpc_collections.Delete(
grpc.DeleteCollection(collection_name=collection_name),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
).result

result: Optional[bool] = self.http.collections_api.delete_collection(
Expand Down Expand Up @@ -2910,6 +2912,7 @@ def _upload_collection(
"metadata": self._grpc_headers,
"wait": wait,
"shard_key_selector": shard_key_selector,
"options": self._grpc_options,
}
else:
updater_kwargs = {
Expand Down Expand Up @@ -3328,7 +3331,7 @@ def create_shard_key(
placement=placement or [],
),
),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
).result
else:
result = self.openapi_client.distributed_api.create_shard_key(
Expand Down Expand Up @@ -3363,7 +3366,7 @@ def delete_shard_key(
shard_key=shard_key,
),
),
timeout=self._timeout,
timeout=timeout if timeout is not None else self._timeout,
).result
else:
result = self.openapi_client.distributed_api.delete_shard_key(
Expand Down

0 comments on commit bc85c04

Please sign in to comment.