From fc08fd901e86bf046dd1c4c9b8318d97279b7b0f Mon Sep 17 00:00:00 2001 From: Mathan Date: Sat, 25 Nov 2023 23:31:27 -0500 Subject: [PATCH 1/2] postgres integration --- docs/postgres_setup.md | 25 +++++++++++++++++++++++++ evadb/catalog/sql_config.py | 13 ++++++++----- evadb/database.py | 15 ++++++++++++++- 3 files changed, 47 insertions(+), 6 deletions(-) create mode 100644 docs/postgres_setup.md diff --git a/docs/postgres_setup.md b/docs/postgres_setup.md new file mode 100644 index 0000000000..854070b45b --- /dev/null +++ b/docs/postgres_setup.md @@ -0,0 +1,25 @@ +## INSTRUCTIONS TO SET UP POSTGRES DATABASE + +These were the steps followed on a Mac system to install postgres and configure it: + +1. Install postgres + ``` brew install postgresql``` +2. Start PostgresSQL: +```brew services start postgressql``` +3. Create a user and database: + + Username: evadb + + Password: password + + - ```psql postgres``` + - ```CREATE ROLE evadb WITH LOGIN PASSWORD 'password';``` + - ```ALTER ROLE evadb CREATEDB;``` + - ```\q``` +4. Login as your new user + + ``` psql -d postgres -U evadb``` +5. create the database evadb + + ```CREATE DATABASE evadb;``` +6. ```pip install psycopg2``` \ No newline at end of file diff --git a/evadb/catalog/sql_config.py b/evadb/catalog/sql_config.py index fed6630f3e..79e1ecdeed 100644 --- a/evadb/catalog/sql_config.py +++ b/evadb/catalog/sql_config.py @@ -71,11 +71,14 @@ def __init__(self, uri): # set echo=True to log SQL # Default to SQLite. - connect_args = {"timeout": 1000} - self.engine = create_engine( - self.worker_uri, - connect_args=connect_args, - ) + # connect_args = {"timeout": 1000} + # self.engine = create_engine( + # self.worker_uri, + # connect_args=connect_args, + # ) + + # Postgres version + self.engine = create_engine(self.worker_uri) if self.engine.url.get_backend_name() == "sqlite": # enforce foreign key constraint and wal logging for sqlite diff --git a/evadb/database.py b/evadb/database.py index 9c22d5b9ff..47c11fcaed 100644 --- a/evadb/database.py +++ b/evadb/database.py @@ -41,10 +41,23 @@ def catalog(self) -> "CatalogManager": return self.catalog_func(self.catalog_uri) -def get_default_db_uri(evadb_dir: Path): +def get_default_db_uri_sqlite(evadb_dir: Path): # Default to sqlite. return f"sqlite:///{evadb_dir.resolve()}/{DB_DEFAULT_NAME}" +def get_default_db_uri(evadb_dir: Path): + """ + Generates a PostgreSQL connection URI for the local database. + + Returns: + str: A PostgreSQL connection URI. + """ + user = "evadb" + password = "password" + host = "localhost" + port = 5432 # Default PostgreSQL port + db_name = "evadb" + return f"postgresql://{user}:{password}@{host}:{port}/{db_name}" def init_evadb_instance( db_dir: str, host: str = None, port: int = None, custom_db_uri: str = None From 20c5cb6f4e56f7c37593510c84c29f7a970e7235 Mon Sep 17 00:00:00 2001 From: Mathan Date: Sat, 25 Nov 2023 23:33:16 -0500 Subject: [PATCH 2/2] string_agg implementation --- evadb/expression/abstract_expression.py | 1 + evadb/expression/aggregation_expression.py | 6 ++++++ evadb/models/storage/batch.py | 16 ++++++++++++++++ evadb/parser/evadb.lark | 4 ++++ evadb/parser/lark_visitor/_functions.py | 10 ++++++++++ 5 files changed, 37 insertions(+) diff --git a/evadb/expression/abstract_expression.py b/evadb/expression/abstract_expression.py index 9b72f32e68..8551c4ec4f 100644 --- a/evadb/expression/abstract_expression.py +++ b/evadb/expression/abstract_expression.py @@ -56,6 +56,7 @@ class ExpressionType(IntEnum): AGGREGATION_FIRST = auto() AGGREGATION_LAST = auto() AGGREGATION_SEGMENT = auto() + AGGREGATION_STRING_AGG = auto() CASE = auto() # add other types diff --git a/evadb/expression/aggregation_expression.py b/evadb/expression/aggregation_expression.py index f1ba6b16c4..c57cc73aaa 100644 --- a/evadb/expression/aggregation_expression.py +++ b/evadb/expression/aggregation_expression.py @@ -54,6 +54,12 @@ def evaluate(self, *args, **kwargs): batch.aggregate("min") elif self.etype == ExpressionType.AGGREGATION_MAX: batch.aggregate("max") + elif self.etype == ExpressionType.AGGREGATION_STRING_AGG: + # Assuming two children: the column and the delimiter + column_to_aggregate = self.get_child(0).evaluate(*args, **kwargs) + delimiter = kwargs.get('delimiter') + batch.aggregate_string_aggregation(column_to_aggregate, delimiter) + batch.reset_index() column_name = self.etype.name diff --git a/evadb/models/storage/batch.py b/evadb/models/storage/batch.py index 43e69cc4fc..e1fdddbe7b 100644 --- a/evadb/models/storage/batch.py +++ b/evadb/models/storage/batch.py @@ -450,3 +450,19 @@ def to_numpy(self): def rename(self, columns) -> None: "Rename column names" self._frames.rename(columns=columns, inplace=True) + + def aggregate_string_aggregation(self, column_name:str, delimiter:str): + # First, ensure the column data is in string format + string_column = self._frames[column_name].astype(str) + + def aggregate_column(data, sep): + # Join the data using the provided separator + aggregated_string = sep.join(data) + return aggregated_string + + aggregated_result = aggregate_column(string_column, delimiter) + + aggregated_dataframe = pd.DataFrame({column_name: [aggregated_result]}) + + # Update the original DataFrame with the new aggregated DataFrame + self._frames = aggregated_dataframe diff --git a/evadb/parser/evadb.lark b/evadb/parser/evadb.lark index 4b96bf647b..998a9360bd 100644 --- a/evadb/parser/evadb.lark +++ b/evadb/parser/evadb.lark @@ -297,6 +297,7 @@ or_replace: OR REPLACE function_call: function ->function_call | aggregate_windowed_function ->aggregate_function_call + | string_agg_function function: simple_id "(" (STAR | function_args)? ")" dotted_id? @@ -306,6 +307,9 @@ aggregate_windowed_function: aggregate_function_name "(" function_arg ")" aggregate_function_name: AVG | MAX | MIN | SUM | FIRST | LAST | SEGMENT +string_agg_function: "STRING_AGG" LR_BRACKET expression COMMA string_literal RR_BRACKET + + function_args: (function_arg) ("," function_arg)* function_arg: constant | expression diff --git a/evadb/parser/lark_visitor/_functions.py b/evadb/parser/lark_visitor/_functions.py index 2b2c180953..df1e682400 100644 --- a/evadb/parser/lark_visitor/_functions.py +++ b/evadb/parser/lark_visitor/_functions.py @@ -134,11 +134,14 @@ def get_aggregate_function_type(self, agg_func_name): agg_func_type = ExpressionType.AGGREGATION_LAST elif agg_func_name == "SEGMENT": agg_func_type = ExpressionType.AGGREGATION_SEGMENT + elif agg_func_name == "STRING_AGG": + agg_func_type = ExpressionType.AGGREGATION_STRING_AGG return agg_func_type def aggregate_windowed_function(self, tree): agg_func_arg = None agg_func_name = None + agg_func_args = [] for child in tree.children: if isinstance(child, Tree): @@ -156,6 +159,13 @@ def aggregate_windowed_function(self, tree): else: agg_func_arg = TupleValueExpression(name="id") + if agg_func_name == "STRING_AGG": + if len(agg_func_args) != 2: + raise ValueError("String Agg requires exactly two arguments") + agg_func_type = self.get_aggregate_function_type(agg_func_name) + agg_expr = AggregationExpression(agg_func_type, None, *agg_func_args) + return agg_expr + agg_func_type = self.get_aggregate_function_type(agg_func_name) agg_expr = AggregationExpression(agg_func_type, None, agg_func_arg) return agg_expr