diff --git a/docs/reference/offline-stores/hybrid_offline.md b/docs/reference/offline-stores/hybrid_offline.md new file mode 100644 index 00000000000..1aad27bf4d8 --- /dev/null +++ b/docs/reference/offline-stores/hybrid_offline.md @@ -0,0 +1,226 @@ +# Hybrid Offline Store + +## Overview + +The Hybrid Offline Store is a specialized store that routes operations to different underlying offline stores based on the data source type. This enables you to use multiple types of data sources (e.g., BigQuery, Redshift, Snowflake, File) within the same Feast deployment. + +## When to Use + +Consider using the Hybrid Offline Store when: + +- You have data spread across multiple data platforms +- You want to gradually migrate from one data source to another +- Different teams in your organization use different data storage technologies +- You need to optimize cost by using specialized stores for specific workloads + +## Configuration + +### Setting Up the Hybrid Offline Store + +To use the Hybrid Offline Store, you need to configure it in your `feature_store.yaml` file: + +```yaml +project: my_project +registry: registry.db +provider: local +offline_store: + type: feast.infra.offline_stores.hybrid_offline_store.HybridOfflineStore + offline_stores: + - type: bigquery + dataset: feast_dataset + project_id: gcp_project_id + - type: redshift + cluster_id: my_redshift_cluster + region: us-west-2 + user: admin + database: feast + s3_staging_location: s3://feast-bucket/staging + - type: file + path: /data/feast +``` + +### Supported Offline Stores + +The Hybrid Offline Store supports all of Feast's offline stores: + +- BigQuery +- Redshift +- Snowflake +- File (Parquet, CSV) +- Postgres +- Spark +- Trino +- Custom offline stores + +## Usage + +### Defining Feature Views with Different Source Types + +When using the Hybrid Offline Store, you can define feature views with different source types: + +```python +# BigQuery source +bq_source = BigQuerySource( + table="my_table", + event_timestamp_column="timestamp", + created_timestamp_column="created_ts", +) + +# File source +file_source = FileSource( + path="/data/transactions.parquet", + event_timestamp_column="timestamp", + created_timestamp_column="created_ts", +) + +# Define feature views with different sources +driver_stats_bq = FeatureView( + name="driver_stats_bq", + entities=[driver], + ttl=timedelta(days=1), + source=bq_source, + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int32), + ], +) + +driver_stats_file = FeatureView( + name="driver_stats_file", + entities=[driver], + ttl=timedelta(days=1), + source=file_source, + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int32), + ], +) +``` + +### How Routing Works + +The Hybrid Offline Store routes operations to the appropriate underlying store based on the source type: + +1. The source type is determined by examining the class name of the data source (e.g., `BigQuerySource`, `FileSource`). +2. The source store type is extracted from the class name (e.g., `bigquery`, `file`). +3. The operation is delegated to the matching offline store configuration. + +## Limitations + +- All feature views used in a single `get_historical_features` call must have the same source type. +- Custom data sources must follow the naming convention `{SourceType}Source` (e.g., `BigQuerySource`, `FileSource`). +- Each offline store configuration must have a unique `type` value. + +## Implementation Details + +The Hybrid Offline Store acts as a router, delegating operations to the appropriate underlying store. Key operations that are routed include: + +- `get_historical_features`: Retrieves historical feature values for training or batch scoring +- `pull_latest_from_table_or_query`: Pulls the latest feature values from a table or query +- `pull_all_from_table_or_query`: Pulls all feature values from a table or query for a specified time range +- `offline_write_batch`: Writes a batch of feature values to the offline store + +## Troubleshooting + +### Common Issues + +#### Feature Views with Different Source Types + +If you encounter an error like: +``` +ValueError: All feature views must have the same source type +``` + +This means you're trying to retrieve historical features for feature views with different source types in a single call. Split the call into multiple calls, one per source type. + +#### Offline Store Configuration Not Found + +If you encounter an error like: +``` +ValueError: No offline store configuration found for source type 'X' +``` + +Make sure you've included an offline store configuration for each source type you're using. + +## Examples + +### Complete Feature Repository Example + +```python +from datetime import timedelta +from feast import Entity, FeatureView, Field, FileSource, BigQuerySource, FeatureStore +from feast.types import Float32, Int32, Int64 + +# Define an entity +driver = Entity( + name="driver_id", + value_type=Int64, + description="Driver ID", +) + +# BigQuery source +bq_source = BigQuerySource( + table="my_project.my_dataset.driver_stats", + event_timestamp_column="event_timestamp", +) + +# File source +file_source = FileSource( + path="/data/driver_activity.parquet", + event_timestamp_column="event_timestamp", +) + +# Feature view with BigQuery source +driver_stats_bq = FeatureView( + name="driver_stats_bq", + entities=[driver], + ttl=timedelta(days=1), + source=bq_source, + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int32), + ], +) + +# Feature view with File source +driver_activity_file = FeatureView( + name="driver_activity_file", + entities=[driver], + ttl=timedelta(days=1), + source=file_source, + schema=[ + Field(name="active_hours", dtype=Float32), + Field(name="driving_days", dtype=Int32), + ], +) + +# Apply feature views +fs = FeatureStore(repo_path="../../how-to-guides/customizing-feast") +fs.apply([driver, driver_stats_bq, driver_activity_file]) + +# Get historical features from BigQuery source +training_df_bq = fs.get_historical_features( + entity_df=entity_df, + features=[ + "driver_stats_bq:conv_rate", + "driver_stats_bq:acc_rate", + "driver_stats_bq:avg_daily_trips", + ], +).to_df() + +# Get historical features from File source +training_df_file = fs.get_historical_features( + entity_df=entity_df, + features=[ + "driver_activity_file:active_hours", + "driver_activity_file:driving_days", + ], +).to_df() +``` + +## Conclusion + +The Hybrid Offline Store provides flexibility in working with multiple data sources while maintaining the simplicity of the Feast API. By routing operations to the appropriate underlying store, it allows you to leverage the strengths of different data platforms within a single Feast deployment. diff --git a/docs/reference/online-stores/hybrid.md b/docs/reference/online-stores/hybrid.md new file mode 100644 index 00000000000..38527d9a66e --- /dev/null +++ b/docs/reference/online-stores/hybrid.md @@ -0,0 +1,111 @@ +# Hybrid online store + +## Description + +The HybridOnlineStore allows routing online feature operations to different online store backends based on a configurable tag (such as `tribe`, `team`, or `project`) on the FeatureView. This enables a single Feast deployment to support multiple online store backends, each configured independently and selected dynamically at runtime. + +## Getting started + +To use the HybridOnlineStore, install Feast with all required online store dependencies (e.g., Bigtable, Cassandra, etc.) for the stores you plan to use. For example: + +``` +pip install 'feast[gcp,cassandra]' +``` + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: hybrid_online_store.HybridOnlineStore + routing_tag: team # or any tag name you want to use in FeatureView's for routing + online_stores: + - type: bigtable + conf: + project_id: my_gcp_project + instance: my_bigtable_instance + - type: cassandra + conf: + hosts: + - cassandra1.example.com + - cassandra2.example.com + keyspace: feast_keyspace + username: feast_user + password: feast_password +``` +{% endcode %} + +### Setting the Routing Tag in FeatureView + +To enable routing, add a tag to your FeatureView that matches the `routing_tag` specified in your `feature_store.yaml`. For example, if your `routing_tag` is `team`, add a `team` tag to your FeatureView: + +```yaml +tags: + team: bigtable # This tag determines which online store is used +``` + +The value of this tag (e.g., `bigtable`) should match the type or identifier of the online store you want to use for this FeatureView. The HybridOnlineStore will route all online operations for this FeatureView to the corresponding backend. + +### Example FeatureView + +{% code title="feature_view" %} +```yaml +name: user_features +entities: + - name: user_id + join_keys: ["user_id"] +ttl: null +schema: + - name: age + dtype: int64 + - name: country + dtype: string +online: true +source: + path: data/user_features.parquet + event_timestamp_column: event_timestamp + created_timestamp_column: created_timestamp +tags: + team: bigtable # This tag determines which online store is used +``` +{% endcode %} + +The `team` tag in the FeatureView's `tags` field determines which online store backend is used for this FeatureView. In this example, all online operations for `user_features` will be routed to the Bigtable online store, as specified by the tag value and the `routing_tag` in your `feature_store.yaml`. + +The HybridOnlineStore will route requests to the correct online store based on the value of the tag specified by `routing_tag`. + +The full set of configuration options for each online store is available in their respective documentation: +- [BigtableOnlineStoreConfig](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.bigtable.BigtableOnlineStoreConfig) +- [CassandraOnlineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.online_stores.cassandra_online_store.cassandra_online_store.CassandraOnlineStoreConfig) + +For a full explanation of configuration options, please refer to the documentation for each online store backend you configure in the `online_stores` list. + +Storage specifications can be found at [docs/specs/online_store_format.md](../../specs/online_store_format.md). + +## Functionality Matrix + +The set of functionality supported by online stores is described in detail [here](overview.md#functionality). Below is a matrix indicating which functionality is supported by the HybridOnlineStore. + +| | HybridOnlineStore | +|-----------------------------------------------------------|-------------------| +| write feature values to the online store | yes | +| read feature values from the online store | yes | +| update infrastructure (e.g. tables) in the online store | yes | +| teardown infrastructure (e.g. tables) in the online store | yes | +| generate a plan of infrastructure changes | no | +| support for on-demand transforms | yes | +| readable by Python SDK | yes | +| readable by Java | no | +| readable by Go | no | +| support for entityless feature views | yes | +| support for concurrent writing to the same key | yes | +| support for ttl (time to live) at retrieval | no | +| support for deleting expired data | no | +| collocated by feature view | yes | +| collocated by feature service | no | +| collocated by entity key | yes | + +To compare this set of functionality against other online stores, please see the full [functionality matrix](overview.md#functionality-matrix). diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store/__init__.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store/__init__.py new file mode 100644 index 00000000000..09a6cbd3c78 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store/__init__.py @@ -0,0 +1,6 @@ +from feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store import ( + HybridOfflineStore, + HybridOfflineStoreConfig, +) + +__all__ = ["HybridOfflineStore", "HybridOfflineStoreConfig"] diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store/hybrid_offline_store.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store/hybrid_offline_store.py new file mode 100644 index 00000000000..ef4cee9db05 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store/hybrid_offline_store.py @@ -0,0 +1,249 @@ +""" +HybridOfflineStore implementation that routes operations to different offline stores +based on data source type. +""" + +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union + +import pandas as pd +import pyarrow +from pydantic import Field, StrictStr + +from feast import FeatureView +from feast.data_source import DataSource +from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob +from feast.infra.offline_stores.offline_utils import get_offline_store_from_config +from feast.infra.registry.base_registry import BaseRegistry +from feast.repo_config import ( + FeastConfigBaseModel, + RepoConfig, + get_offline_config_from_type, +) + + +class OfflineStoreConfig(FeastConfigBaseModel): + """Configuration for a single offline store backend.""" + + type: StrictStr = Field( + description="Type of offline store (e.g., 'bigquery', 'spark')" + ) + conf: Dict[str, Any] = Field( + default_factory=dict, description="Store-specific configuration" + ) + + +class HybridOfflineStoreConfig(FeastConfigBaseModel): + """Configuration for the HybridOfflineStore.""" + + type: Literal["HybridOfflineStore", "hybrid_offline_store.HybridOfflineStore"] = ( + "hybrid_offline_store.HybridOfflineStore" + ) + offline_stores: List[OfflineStoreConfig] = Field( + default_factory=list, description="List of offline store configurations" + ) + + +class HybridOfflineStore(OfflineStore): + """ + Routes operations to different offline stores based on data source type. + + This store acts as a router that delegates operations to the appropriate + underlying offline store based on the type of data source being accessed. + """ + + @staticmethod + def _prepare_store_config( + base_config: RepoConfig, store_config: OfflineStoreConfig + ) -> Dict[str, Any]: + """ + Prepare store configuration dictionary from base config and store-specific config. + + Args: + base_config: Base repository configuration containing common settings + store_config: Store-specific configuration for the target offline store + + Returns: + Dictionary containing the combined configuration ready for offline store + """ + config_dict = base_config.__dict__.copy() + config_dict["registry"] = config_dict["registry_config"] + config_dict["offline_store"] = store_config.conf.copy() + config_dict["offline_store"]["type"] = store_config.type + config_dict["online_store"] = config_dict["online_config"] + return config_dict + + @staticmethod + def _get_store_and_config_by_source( + repo_config: RepoConfig, data_source: DataSource + ) -> Tuple[OfflineStore, RepoConfig]: + """ + Get the appropriate offline store and configuration for a data source. + + Args: + repo_config: Repository configuration + data_source: Data source to match with an offline store + + Returns: + Tuple of (offline store instance, store configuration) + + Raises: + ValueError: If no matching offline store configuration is found or if + the data source type is not valid + """ + # Get source type information + source_type = type(data_source).__name__ + source_store_type = source_type.lower().replace("source", "") + + # Find matching store configuration + store_config = next( + ( + config + for config in repo_config.offline_store.offline_stores + if config.type.lower() == source_store_type + ), + None, + ) + + if not store_config: + available_stores = [ + store.type for store in repo_config.offline_store.offline_stores + ] + raise ValueError( + f"No offline store configuration found for source type '{source_type}'. " + f"Available store types: {available_stores}" + ) + + # Create offline store instance and configuration + offline_config = get_offline_config_from_type(source_store_type)( + **store_config.conf + ) + offline_store = get_offline_store_from_config(offline_config) + + # Create a new RepoConfig with the appropriate store configuration + store_repo_config = RepoConfig( + **HybridOfflineStore._prepare_store_config(repo_config, store_config) + ) + + return offline_store, store_repo_config + + @staticmethod + def _validate_feature_views_source_type(feature_views: List[FeatureView]) -> None: + """ + Validate that all feature views have the same source type. + + This check ensures that features can be retrieved from a single offline store. + + Args: + feature_views: List of feature views to validate + + Raises: + ValueError: If feature views have different source types + """ + if len(feature_views) > 1: + source_types = {type(fv.batch_source).__name__ for fv in feature_views} + if len(source_types) > 1: + raise ValueError( + f"All feature views must have the same source type for hybrid offline store. " + f"Found multiple source types: {', '.join(sorted(source_types))}" + ) + + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + """Route pull operation to appropriate offline store.""" + store, store_config = HybridOfflineStore._get_store_and_config_by_source( + config, data_source + ) + return store.pull_latest_from_table_or_query( + config=store_config, + data_source=data_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> RetrievalJob: + """Get historical features from appropriate offline store.""" + # Ensure all feature views use the same source type + HybridOfflineStore._validate_feature_views_source_type(feature_views) + + # Get store for the first feature view's source + store, store_config = HybridOfflineStore._get_store_and_config_by_source( + config, feature_views[0].batch_source + ) + + # Delegate to the appropriate store + return store.get_historical_features( + config=store_config, + feature_views=feature_views, + feature_refs=feature_refs, + entity_df=entity_df, + registry=registry, + project=project, + full_feature_names=full_feature_names, + ) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> RetrievalJob: + """Pull all data from appropriate offline store.""" + store, store_config = HybridOfflineStore._get_store_and_config_by_source( + config, data_source + ) + return store.pull_all_from_table_or_query( + config=store_config, + data_source=data_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pyarrow.Table, + progress: Optional[Callable[[int], Any]], + ) -> None: + """Write batch data to appropriate offline store.""" + store, store_config = HybridOfflineStore._get_store_and_config_by_source( + config, feature_view.batch_source + ) + store.offline_write_batch( + config=store_config, + feature_view=feature_view, + table=table, + progress=progress, + ) diff --git a/sdk/python/feast/infra/offline_stores/hybrid_offline_store/hybrid_repo_configuration.py b/sdk/python/feast/infra/offline_stores/hybrid_offline_store/hybrid_repo_configuration.py new file mode 100644 index 00000000000..838ce233306 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/hybrid_offline_store/hybrid_repo_configuration.py @@ -0,0 +1,54 @@ +from feast.infra.offline_stores.bigquery import BigQuerySource +from feast.infra.offline_stores.file_source import FileSource +from tests.integration.feature_repos.repo_configuration import REDIS_CONFIG +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) +from tests.integration.feature_repos.universal.online_store.redis import ( + RedisOnlineStoreCreator, +) + + +class HybridDataSourceCreator(DataSourceCreator): + """Creates data sources for hybrid offline store tests.""" + + def create_data_source( + self, + dataset_name, + timestamp_field="ts", + created_timestamp_field=None, + field_mapping=None, + date_partition_column=None, + ): + """ + Create a hybrid data source for testing that uses BigQuery and File sources. + + The source type will be chosen based on the dataset name for demonstration purposes. + In practice, the HybridOfflineStore will route to the appropriate offline store + based on the data source type. + """ + if "bigquery" in dataset_name: + return BigQuerySource( + table=f"{dataset_name}", + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_field, + field_mapping=field_mapping or {}, + date_partition_column=date_partition_column, + ) + else: + return FileSource( + path=f"{dataset_name}.parquet", + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_field, + field_mapping=field_mapping or {}, + date_partition_column=date_partition_column, + ) + + +# Define available offline stores for testing +AVAILABLE_OFFLINE_STORES = [ + ("local", HybridDataSourceCreator), +] + +# Define available online stores to pair with hybrid offline store +AVAILABLE_ONLINE_STORES = {"redis": (REDIS_CONFIG, RedisOnlineStoreCreator)} diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py new file mode 100644 index 00000000000..e929e039411 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store.py @@ -0,0 +1,329 @@ +""" +hybrid_online_store.py +---------------------- + +This module provides the HybridOnlineStore, a Feast OnlineStore implementation that enables routing online feature operations +to different online stores based on a configurable tag (e.g., tribe, team, or project) on the FeatureView. This allows a single Feast deployment +to support multiple online store backends, each configured independently and selected dynamically at runtime. + +Features: + - Supports multiple online store backends in a single Feast deployment. + - Routes online reads and writes to the correct backend based on a configurable tag on the FeatureView. + - Enables multi-tenancy and flexible data management strategies. + - Designed for extensibility and compatibility with Feast's OnlineStore interface. + +Usage: + 1. Add a tag (e.g., 'tribe', 'team', or any custom name) to your FeatureView. + 2. Configure multiple online stores in your Feast repo config under 'online_stores'. + 3. Set the 'routing_tag' field in your online_store config to specify which tag to use for routing. + 4. The HybridOnlineStore will route reads and writes to the correct backend based on the tag value. + +Example configuration (feature_store.yaml): + + online_store: + type: hybrid_online_store.HybridOnlineStore + routing_tag: team # or any tag name you want to use for routing + online_stores: + - type: feast.infra.online_stores.bigtable.BigtableOnlineStore + conf: + ... # bigtable config + - type: feast.infra.online_stores.contrib.cassandra_online_store.cassandra_online_store.CassandraOnlineStore + conf: + ... # cassandra config + +Example FeatureView: + + tags: + team: bigtable + +The HybridOnlineStore will route requests to the correct online store based on the value of the tag specified by 'routing_tag'. +""" + +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple + +from pydantic import StrictStr + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.online_stores.helpers import get_online_store_from_config +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, get_online_config_from_type + + +class HybridOnlineStoreConfig(FeastConfigBaseModel): + """ + Configuration for HybridOnlineStore. + + This config allows multiple online stores to be used in a single Feast deployment. Each online store is specified by its type (Python import path) + and a configuration dictionary. The HybridOnlineStore uses this configuration to instantiate and manage the set of online stores. + + Attributes: + type: The type identifier for the HybridOnlineStore. + online_stores: A list of OnlineStoresWithConfig, each specifying the type and config for an online store backend. + """ + + type: Literal["HybridOnlineStore", "hybrid_online_store.HybridOnlineStore"] = ( + "hybrid_online_store.HybridOnlineStore" + ) + + class OnlineStoresWithConfig(FeastConfigBaseModel): + """ + Configuration for a single online store backend. + + Attributes: + type: Python import path to the online store class. + conf: Dictionary of configuration parameters for the online store. + """ + + type: StrictStr # Python import path to the online store class + conf: Dict + + online_stores: Optional[List[OnlineStoresWithConfig]] + routing_tag: StrictStr = ( + "tribe" # Configurable tag name for routing, default is 'tribe' + ) + + +class HybridOnlineStore(OnlineStore): + """ + HybridOnlineStore routes online feature operations to different online store backends + based on a tag (e.g., 'tribe') on the FeatureView. This enables multi-tenancy and flexible + backend selection in a single Feast deployment. + + The backend is selected dynamically at runtime according to the tag value. + """ + + def __init__(self): + """ + Initialize the HybridOnlineStore. Online stores are instantiated lazily on first use. + """ + self.online_stores = {} + self._initialized = False + + def _initialize_online_stores(self, config: RepoConfig): + """ + Lazily instantiate all configured online store backends from the repo config. + + Args: + config: Feast RepoConfig containing the online_stores configuration. + """ + if self._initialized: + return + self.online_stores = {} + online_stores_cfg = getattr(config.online_store, "online_stores", []) + for store_cfg in online_stores_cfg: + config_cls = get_online_config_from_type( + store_cfg.type.split(".")[-1].lower() + ) + config_instance = config_cls(**store_cfg.conf) + online_store_instance = get_online_store_from_config(config_instance) + self.online_stores[store_cfg.type.split(".")[-1].lower()] = ( + online_store_instance + ) + self._initialized = True + + def _get_online_store(self, tribe_tag, config: RepoConfig): + """ + Retrieve the online store backend corresponding to the given tag value. + + Args: + tribe_tag: The tag value (e.g., 'tribe') used to select the backend. + config: Feast RepoConfig. + Returns: + The OnlineStore instance for the given tag, or None if not found. + """ + self._initialize_online_stores(config) + return self.online_stores.get(tribe_tag.lower()) + + def _prepare_repo_conf(self, config: RepoConfig, online_store_type: str): + """ + Prepare a RepoConfig for the selected online store backend. + + Args: + config: The original Feast RepoConfig. + online_store_type: The type of the online store backend to use. + Returns: + A dictionary representing the updated RepoConfig for the selected backend. + """ + rconfig = config + for online_store in config.online_store.online_stores: + if online_store.type.split(".")[-1].lower() == online_store_type.lower(): + rconfig.online_config = online_store.conf + rconfig.online_config["type"] = online_store.type + data = rconfig.__dict__ + data["registry"] = data["registry_config"] + data["offline_store"] = data["offline_config"] + data["online_store"] = data["online_config"] + return data + + def _get_routing_tag_value(self, table: FeatureView, config: RepoConfig): + tag_name = getattr(config.online_store, "routing_tag", "tribe") + return table.tags.get(tag_name) + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + odata: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + """ + Write a batch of feature rows to the appropriate online store based on the FeatureView's tag. + + Args: + config: Feast RepoConfig. + table: FeatureView to write to. Must have a tag (e.g., 'tribe') to select the backend. + odata: List of tuples containing entity key, feature values, event timestamp, and created timestamp. + progress: Optional callback for progress reporting. + Raises: + ValueError: If the FeatureView does not have the required tag. + NotImplementedError: If no online store is found for the tag value. + """ + tribe = self._get_routing_tag_value(table, config) + if not tribe: + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + online_store = self._get_online_store(tribe, config) + if online_store: + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + online_store.online_write_batch(config, table, odata, progress) + else: + raise NotImplementedError( + f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." + ) + + @staticmethod + def write_to_table( + created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val + ): + """ + (Not implemented) Write a single feature value to the online store table. + """ + pass + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + """ + Read feature rows from the appropriate online store based on the FeatureView's tag. + + Args: + config: Feast RepoConfig. + table: FeatureView to read from. Must have a tag (e.g., 'tribe') to select the backend. + entity_keys: List of entity keys to read. + requested_features: Optional list of feature names to read. + Returns: + List of tuples containing event timestamp and feature values. + Raises: + ValueError: If the FeatureView does not have the required tag. + NotImplementedError: If no online store is found for the tag value. + """ + tribe = self._get_routing_tag_value(table, config) + if not tribe: + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + online_store = self._get_online_store(tribe, config) + if online_store: + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + return online_store.online_read( + config, table, entity_keys, requested_features + ) + else: + raise NotImplementedError( + f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." + ) + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + """ + Update the state of the online stores for the given FeatureViews and Entities. + + Args: + config: Feast RepoConfig. + tables_to_delete: Sequence of FeatureViews to delete. + tables_to_keep: Sequence of FeatureViews to keep. + entities_to_delete: Sequence of Entities to delete. + entities_to_keep: Sequence of Entities to keep. + partial: Whether to perform a partial update. + Raises: + ValueError: If a FeatureView does not have the required tag. + NotImplementedError: If no online store is found for a tag value. + """ + for table in tables_to_keep: + tribe = self._get_routing_tag_value(table, config) + if not tribe: + tag_name = getattr(config.online_store, "routing_tag", "tribe") + raise ValueError( + f"FeatureView must have a '{tag_name}' tag to use HybridOnlineStore." + ) + online_store = self._get_online_store(tribe, config) + if online_store: + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + online_store.update( + config, + tables_to_delete, + tables_to_keep, + entities_to_delete, + entities_to_keep, + partial, + ) + else: + raise NotImplementedError( + f"No online store found for {getattr(config.online_store, 'routing_tag', 'tribe')} tag '{tribe}'. Please check your configuration." + ) + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + """ + Teardown all managed online stores for the given FeatureViews and Entities. + + Args: + config: Feast RepoConfig. + tables: Sequence of FeatureViews to teardown. + entities: Sequence of Entities to teardown. + """ + # Use a set of (tribe, store_type, conf_id) to avoid duplicate teardowns for the same instance + tribes_seen = set() + online_stores_cfg = getattr(config.online_store, "online_stores", []) + tag_name = getattr(config.online_store, "routing_tag", "tribe") + for table in tables: + tribe = table.tags.get(tag_name) + if not tribe: + continue + # Find all store configs matching this tribe (supporting multiple instances of the same type) + for store_cfg in online_stores_cfg: + store_type = store_cfg.type + # Use id(store_cfg.conf) to distinguish different configs of the same type + key = (tribe, store_type, id(store_cfg.conf)) + if key in tribes_seen: + continue + tribes_seen.add(key) + # Only select the online store if tribe matches the type (or you can add a mapping in config for more flexibility) + if tribe.lower() == store_type.split(".")[-1].lower(): + online_store = self._get_online_store(tribe, config) + if online_store: + config = RepoConfig(**self._prepare_repo_conf(config, tribe)) + online_store.teardown(config, tables, entities) diff --git a/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store_repo_configuration.py b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store_repo_configuration.py new file mode 100644 index 00000000000..90a65a092d0 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/hybrid_online_store/hybrid_online_store_repo_configuration.py @@ -0,0 +1,28 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file provides integration test repo configuration for HybridOnlineStore. +# It enables running integration tests with multiple online store backends. +# Update this file if you add more backends or change test setup. + +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.universal.online_store.hybrid_online_store import ( + HybridOnlineStoreCreator, +) + +FULL_REPO_CONFIGS = [ + IntegrationTestRepoConfig(online_store_creator=HybridOnlineStoreCreator), +] diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/hybrid_online_store.py b/sdk/python/tests/integration/feature_repos/universal/online_store/hybrid_online_store.py new file mode 100644 index 00000000000..f0efbd11044 --- /dev/null +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/hybrid_online_store.py @@ -0,0 +1,25 @@ +from sdk.python.tests.integration.feature_repos.universal.online_store_creator import ( + OnlineStoreCreator, +) + + +class HybridOnlineStoreCreator(OnlineStoreCreator): + def create_online_store(self): + # Use Redis and SQLite as two backends for demonstration/testing, but mock Redis config for unit tests + return { + "type": "hybrid_online_store.HybridOnlineStore", + "online_stores": [ + { + "type": "redis", + "conf": { + "redis_type": "redis", + "connection_string": "localhost:6379", + }, + }, + {"type": "sqlite", "conf": {"path": "/tmp/feast_hybrid_test.db"}}, + ], + } + + def teardown(self): + # Implement any resource cleanup if needed (e.g., remove test DB files) + pass diff --git a/sdk/python/tests/integration/online_store/test_hybrid_online_store.py b/sdk/python/tests/integration/online_store/test_hybrid_online_store.py new file mode 100644 index 00000000000..4b9dad05ff8 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_hybrid_online_store.py @@ -0,0 +1,87 @@ +from datetime import datetime +from unittest.mock import patch + +import pytest + +from feast import Entity, FeatureView, Field, FileSource, RepoConfig, ValueType +from feast.infra.online_stores.hybrid_online_store.hybrid_online_store import ( + HybridOnlineStore, + HybridOnlineStoreConfig, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey +from feast.protos.feast.types.Value_pb2 import Value +from feast.types import PrimitiveFeastType + + +@pytest.fixture +def sample_entity(): + return Entity(name="id", join_keys=["id"], value_type=ValueType.INT64) + + +@pytest.fixture +def sample_feature_view(sample_entity): + file_source = FileSource( + path="/tmp/feast_hybrid_test.parquet", + event_timestamp_column="event_timestamp", + ) + return FeatureView( + name="test_fv", + entities=[sample_entity], + schema=[Field(name="feature1", dtype=PrimitiveFeastType.INT64)], + online=True, + tags={"tribe": "redis"}, + source=file_source, + ) + + +@pytest.fixture +def sample_repo_config(): + # Minimal config for HybridOnlineStore with two backends (mocked for test) + return RepoConfig( + registry="test-registry.db", + project="test_project", + provider="local", + online_store=HybridOnlineStoreConfig( + online_stores=[ + HybridOnlineStoreConfig.OnlineStoresWithConfig( + type="redis", + conf={"redis_type": "redis", "connection_string": "localhost:6379"}, + ), + HybridOnlineStoreConfig.OnlineStoresWithConfig( + type="sqlite", + conf={"path": "/tmp/feast_hybrid_test.db"}, + ), + ] + ), + offline_store=None, + ) + + +@pytest.mark.usefixtures("sample_entity", "sample_feature_view", "sample_repo_config") +def test_hybrid_online_store_write_and_read(sample_repo_config, sample_feature_view): + with ( + patch( + "feast.infra.online_stores.redis.RedisOnlineStore.online_write_batch" + ) as mock_write, + patch( + "feast.infra.online_stores.redis.RedisOnlineStore.online_read" + ) as mock_read, + ): + mock_write.return_value = None + mock_read.return_value = [(None, {"feature1": Value(int64_val=100)})] + store = HybridOnlineStore() + entity_key = EntityKey( + join_keys=["id"], + entity_values=[Value(int64_val=1)], + ) + now = datetime.utcnow() + odata = [(entity_key, {"feature1": Value(int64_val=100)}, now, None)] + # Write to the online store (mocked) + store.online_write_batch( + sample_repo_config, sample_feature_view, odata, progress=None + ) + # Read back (mocked) + result = store.online_read( + sample_repo_config, sample_feature_view, [entity_key] + ) + assert result[0][1]["feature1"].int64_val == 100 diff --git a/sdk/python/tests/unit/infra/offline_stores/test_hybrid_offline_store.py b/sdk/python/tests/unit/infra/offline_stores/test_hybrid_offline_store.py new file mode 100644 index 00000000000..21eac2044fa --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_hybrid_offline_store.py @@ -0,0 +1,434 @@ +import datetime +from unittest.mock import Mock, patch + +import pandas as pd +import pyarrow +import pytest + +from feast import FeatureView +from feast.data_source import DataSource +from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig +from feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store import ( + HybridOfflineStore, + HybridOfflineStoreConfig, + OfflineStoreConfig, +) +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig +from feast.repo_config import RepoConfig + + +class MockDataSource(DataSource): + """Mock data source for testing.""" + + def __init__( + self, + name="mock", + timestamp_field="event_timestamp", + created_timestamp_column=None, + field_mapping=None, + date_partition_column=None, + description="", + tags=None, + owner="", + ): + self.name = name + self.timestamp_field = timestamp_field + self.created_timestamp_column = created_timestamp_column or "" + self.field_mapping = field_mapping or {} + self.date_partition_column = date_partition_column or "" + self.description = description + self.tags = tags or {} + self.owner = owner + + @staticmethod + def from_proto(proto): + raise NotImplementedError("Not implemented for testing") + + def to_proto(self): + raise NotImplementedError("Not implemented for testing") + + +class BigquerySource(MockDataSource): + """Mock BigQuery source for testing. The name matches the pattern expected by HybridOfflineStore.""" + + pass + + +class FileSource(MockDataSource): + """Mock File source for testing. The name matches the pattern expected by HybridOfflineStore.""" + + pass + + +class MockRetrievalJob: + """Mock retrieval job for testing.""" + + def __init__(self, df=None, arrow=None): + self._df = df if df is not None else pd.DataFrame() + self._arrow = ( + arrow if arrow is not None else pyarrow.Table.from_pandas(self._df) + ) + + def to_df(self, timeout=None): + return self._df + + def to_arrow(self, timeout=None): + return self._arrow + + +@pytest.fixture +def sample_df(): + return pd.DataFrame( + data={ + "entity_id": [1, 2, 3], + "feature_value": [10.0, 20.0, 30.0], + "event_timestamp": [ + datetime.datetime(2021, 1, 1), + datetime.datetime(2021, 1, 2), + datetime.datetime(2021, 1, 3), + ], + } + ) + + +@pytest.fixture +def bigquery_offline_store(): + """Create a mocked BigQuery offline store.""" + mock_store = Mock() + mock_store.pull_latest_from_table_or_query.return_value = MockRetrievalJob( + df=pd.DataFrame({"entity_id": [1, 2], "feature": [1.0, 2.0]}) + ) + mock_store.get_historical_features.return_value = MockRetrievalJob( + df=pd.DataFrame({"entity_id": [1, 2], "feature": [1.0, 2.0]}) + ) + return mock_store + + +@pytest.fixture +def file_offline_store(): + """Create a mocked File offline store.""" + mock_store = Mock() + mock_store.pull_latest_from_table_or_query.return_value = MockRetrievalJob( + df=pd.DataFrame({"entity_id": [3, 4], "feature": [3.0, 4.0]}) + ) + mock_store.pull_all_from_table_or_query.return_value = MockRetrievalJob( + df=pd.DataFrame({"entity_id": [3, 4], "feature": [3.0, 4.0]}) + ) + mock_store.get_historical_features.return_value = MockRetrievalJob( + df=pd.DataFrame({"entity_id": [3, 4], "feature": [3.0, 4.0]}) + ) + return mock_store + + +@pytest.fixture +def repo_config(): + """Create a repo config with hybrid offline store.""" + return RepoConfig( + registry="registry.db", + project="test_project", + provider="local", + entity_key_serialization_version=3, # Use recommended version to avoid deprecation warning + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=HybridOfflineStoreConfig( + type="feast.infra.offline_stores.hybrid_offline_store.HybridOfflineStore", + offline_stores=[ + OfflineStoreConfig(type="bigquery", conf={"dataset": "feast_test"}), + OfflineStoreConfig(type="file", conf={"path": "/tmp/feast_test"}), + ], + ), + ) + + +@pytest.fixture +def mock_feature_view(): + """Create a mocked feature view.""" + feature_view = Mock(spec=FeatureView) + feature_view.batch_source = BigquerySource() + return feature_view + + +@pytest.fixture +def mock_feature_views(): + """Create mocked feature views with the same source type.""" + fv1 = Mock(spec=FeatureView) + fv1.batch_source = BigquerySource(name="source1") + fv2 = Mock(spec=FeatureView) + fv2.batch_source = BigquerySource(name="source2") + return [fv1, fv2] + + +@pytest.fixture +def mock_mixed_feature_views(): + """Create mocked feature views with different source types.""" + fv1 = Mock(spec=FeatureView) + fv1.batch_source = BigquerySource() + fv2 = Mock(spec=FeatureView) + fv2.batch_source = FileSource() + return [fv1, fv2] + + +class TestHybridOfflineStore: + def test_get_store_and_config_by_source(self, repo_config, monkeypatch): + """Test getting the correct store for a data source.""" + # Mock the get_offline_store_from_config function + mock_get_store = Mock() + mock_get_store.return_value = Mock() + + # Mock the get_offline_config_from_type function + mock_config_type = Mock() + mock_config_type.return_value = Mock() + mock_config_type.return_value.return_value = BigQueryOfflineStoreConfig( + type="bigquery", dataset="feast_test" + ) + + monkeypatch.setattr( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_store_from_config", + mock_get_store, + ) + monkeypatch.setattr( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_config_from_type", + mock_config_type, + ) + + # Test with BigQuery source + bigquery_source = BigquerySource() + store, config = HybridOfflineStore._get_store_and_config_by_source( + repo_config, bigquery_source + ) + + # Verify correct config type was requested + mock_config_type.assert_called_with("bigquery") + + # Verify correct store was requested + mock_get_store.assert_called_once() + + # Test with an unknown source type (should raise ValueError) + unknown_source = Mock() + unknown_source.__class__.__name__ = "UnknownSource" + + with pytest.raises( + ValueError, + match=r"No offline store configuration found for source type 'UnknownSource'", + ): + HybridOfflineStore._get_store_and_config_by_source( + repo_config, unknown_source + ) + + def test_validate_feature_views_source_type( + self, mock_feature_views, mock_mixed_feature_views + ): + """Test validation of feature view source types.""" + # Should pass with single feature view + HybridOfflineStore._validate_feature_views_source_type([mock_feature_views[0]]) + + # Should pass with multiple feature views of same type + HybridOfflineStore._validate_feature_views_source_type(mock_feature_views) + + # Should fail with mixed feature view types + with pytest.raises( + ValueError, match="All feature views must have the same source type" + ): + HybridOfflineStore._validate_feature_views_source_type( + mock_mixed_feature_views + ) + + @patch( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_store_from_config" + ) + def test_pull_latest_from_table_or_query( + self, mock_get_store, repo_config, bigquery_offline_store + ): + """Test pull_latest_from_table_or_query routes to correct store.""" + mock_get_store.return_value = bigquery_offline_store + + data_source = BigquerySource() + start_date = datetime.datetime(2021, 1, 1) + end_date = datetime.datetime(2021, 1, 2) + + result = HybridOfflineStore.pull_latest_from_table_or_query( + config=repo_config, + data_source=data_source, + join_key_columns=["entity_id"], + feature_name_columns=["feature"], + timestamp_field="event_timestamp", + created_timestamp_column=None, + start_date=start_date, + end_date=end_date, + ) + + # Verify the BigQuery store was called with correct parameters + bigquery_offline_store.pull_latest_from_table_or_query.assert_called_once() + call_kwargs = ( + bigquery_offline_store.pull_latest_from_table_or_query.call_args.kwargs + ) + assert call_kwargs["data_source"] == data_source + assert call_kwargs["join_key_columns"] == ["entity_id"] + assert call_kwargs["feature_name_columns"] == ["feature"] + assert call_kwargs["timestamp_field"] == "event_timestamp" + + # Verify result + df = result.to_df() + assert len(df) == 2 + assert "entity_id" in df.columns + assert "feature" in df.columns + + @patch( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_store_from_config" + ) + def test_get_historical_features_same_source( + self, mock_get_store, repo_config, bigquery_offline_store, mock_feature_views + ): + """Test get_historical_features with feature views of the same source type.""" + mock_get_store.return_value = bigquery_offline_store + + entity_df = pd.DataFrame({"entity_id": [1, 2]}) + + result = HybridOfflineStore.get_historical_features( + config=repo_config, + feature_views=mock_feature_views, + feature_refs=["feature_view:feature"], + entity_df=entity_df, + registry=Mock(), + project="test_project", + full_feature_names=True, + ) + + # Verify the BigQuery store was called with correct parameters + bigquery_offline_store.get_historical_features.assert_called_once() + call_kwargs = bigquery_offline_store.get_historical_features.call_args.kwargs + assert call_kwargs["feature_views"] == mock_feature_views + assert call_kwargs["feature_refs"] == ["feature_view:feature"] + assert call_kwargs["project"] == "test_project" + assert call_kwargs[ + "full_feature_names" + ] # Use truth test instead of comparing to True + + # Verify result + df = result.to_df() + assert len(df) == 2 + assert "entity_id" in df.columns + assert "feature" in df.columns + + @patch( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_store_from_config" + ) + def test_get_historical_features_mixed_sources( + self, mock_get_store, repo_config, mock_mixed_feature_views + ): + """Test get_historical_features with feature views of different source types.""" + with pytest.raises( + ValueError, match="All feature views must have the same source type" + ): + HybridOfflineStore.get_historical_features( + config=repo_config, + feature_views=mock_mixed_feature_views, + feature_refs=["feature_view:feature"], + entity_df=pd.DataFrame({"entity_id": [1, 2]}), + registry=Mock(), + project="test_project", + full_feature_names=True, + ) + + @patch( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_store_from_config" + ) + @patch( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_config_from_type" + ) + @patch( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.RepoConfig" + ) + def test_pull_all_from_table_or_query( + self, + mock_repo_config, + mock_config_type, + mock_get_store, + repo_config, + file_offline_store, + ): + """Test pull_all_from_table_or_query routes to correct store.""" + # Set up the mock for config type conversion + mock_config = Mock() + mock_config_type.return_value = Mock() + mock_config_type.return_value.return_value = mock_config + + # Set up the RepoConfig mock to avoid validation errors + mock_repo_config.return_value = repo_config + + # Set up the store mock + mock_get_store.return_value = file_offline_store + + data_source = FileSource() + + result = HybridOfflineStore.pull_all_from_table_or_query( + config=repo_config, + data_source=data_source, + join_key_columns=["entity_id"], + feature_name_columns=["feature"], + timestamp_field="event_timestamp", + ) + + # Verify the correct config type was requested + mock_config_type.assert_called_with("file") + + # Verify the File store was called with correct parameters + file_offline_store.pull_all_from_table_or_query.assert_called_once() + call_kwargs = file_offline_store.pull_all_from_table_or_query.call_args.kwargs + assert call_kwargs["data_source"] == data_source + assert call_kwargs["join_key_columns"] == ["entity_id"] + assert call_kwargs["feature_name_columns"] == ["feature"] + assert call_kwargs["timestamp_field"] == "event_timestamp" + + # Verify result + df = result.to_df() + assert len(df) == 2 + assert "entity_id" in df.columns + assert "feature" in df.columns + + @patch( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_store_from_config" + ) + def test_offline_write_batch( + self, + mock_get_store, + repo_config, + bigquery_offline_store, + mock_feature_view, + sample_df, + ): + """Test offline_write_batch routes to correct store.""" + mock_get_store.return_value = bigquery_offline_store + + table = pyarrow.Table.from_pandas(sample_df) + progress_callback = Mock() + + HybridOfflineStore.offline_write_batch( + config=repo_config, + feature_view=mock_feature_view, + table=table, + progress=progress_callback, + ) + + # Verify the BigQuery store was called with correct parameters + bigquery_offline_store.offline_write_batch.assert_called_once() + call_kwargs = bigquery_offline_store.offline_write_batch.call_args.kwargs + assert call_kwargs["feature_view"] == mock_feature_view + assert call_kwargs["table"] == table + assert call_kwargs["progress"] == progress_callback + + @patch( + "feast.infra.offline_stores.hybrid_offline_store.hybrid_offline_store.get_offline_store_from_config" + ) + def test_prepare_store_config(self, mock_get_store, repo_config): + """Test store config preparation.""" + store_config = repo_config.offline_store.offline_stores[0] + result = HybridOfflineStore._prepare_store_config(repo_config, store_config) + + # Verify key transformations + assert result["registry"] == repo_config.__dict__["registry_config"] + + # The offline_store config will include type, so we need to check individual expected keys + assert result["offline_store"]["dataset"] == store_config.conf["dataset"] + assert result["offline_store"]["type"] == store_config.type + + assert result["online_store"] == repo_config.__dict__["online_config"]