@@ -499,51 +499,37 @@ async def update(self:Table, record=None, pk_values=None, **kwargs):
499499 return await self ._rec (sql , * row .values (), * args , err = f"{ self .name } [{ pk_values } ]" )
500500
501501# %% ../nbs/00_core.ipynb #4e7c77dc
502- def _vals_sql (rows , types ):
503- "Build the VALUES clause and list of arguments"
504- n , args , parts = len (types ), [], []
505- ct = [t .split ('(' )[0 ].strip () for t in types ]
506- for i ,row in enumerate (rows ):
507- parts .append ('(' + ', ' .join (f'${ i * n + j + 1 } ::{ ct [j ]} ' for j in range (n )) + ')' )
508- args .extend (row .values ())
509- return args , ', ' .join (parts )
510-
511- # %% ../nbs/00_core.ipynb #1a1e9bfe
512- def _join_where (pks , xtra_id , n_args ):
513- "Get the join WHERE clause for `updates` and the xtra args needed for it (if any)"
514- pw = ' AND ' .join (f't."{ pk } "=v."{ pk } "' for pk in pks )
515- args = []
516- if xtra_id :
517- pw += ' AND ' + ' AND ' .join (f't."{ k } "=${ n_args + i + 1 } ' for i ,k in enumerate (xtra_id ))
518- args = list (xtra_id .values ())
519- return pw , args
502+ def _norm_rows (rows ):
503+ "Normalize rows to same columns, return (cols, list of arg tuples)"
504+ cols = list (dict .fromkeys (k for r in rows for k in r ))
505+ return cols , [tuple (r .get (c ) for c in cols ) for r in rows ]
520506
521507# %% ../nbs/00_core.ipynb #51c5ace8
522508@patch
523509async def updates (self :Table , records ):
524- "Update multiple rows in one query and return them "
525- if not records : return []
510+ "Update multiple rows"
511+ if not records : return
526512 rows = [_process_row (r , {}) for r in records ]
527- cols = list (rows [0 ].keys ())
528- args , vals = _vals_sql (rows , [self .cols [c ] for c in cols ])
513+ cols , norm = _norm_rows (rows )
529514 non_pk = [c for c in cols if c not in self .pks ]
530- sets = ', ' .join (f'"{ c } "=v."{ c } "' for c in non_pk )
531- pw , xa = _join_where (self .pks , self .xtra_id , len (args ))
532- qcols = ', ' .join (f'"{ c } "' for c in cols )
533- sql = f'UPDATE { self } AS t SET { sets } FROM (VALUES { vals } ) AS v({ qcols } ) WHERE { pw } RETURNING t.*'
534- return await self ._recs (sql , * args , * xa )
515+ sets = ', ' .join (f'"{ c } "=${ i + 1 } ' for i ,c in enumerate (non_pk ))
516+ pw = _pk_where (self .pks , len (non_pk ))
517+ xa = tuple (self .xtra_id .values ())
518+ if xa : pw += ' AND ' + ' AND ' .join (f'"{ k } "=${ len (non_pk )+ len (self .pks )+ i + 1 } ' for i ,k in enumerate (self .xtra_id ))
519+ ci = {c :i for i ,c in enumerate (cols )}
520+ args = [tuple (row [ci [c ]] for c in non_pk ) + tuple (row [ci [pk ]] for pk in self .pks ) + xa for row in norm ]
521+ await self .db .executemany (f'UPDATE { self } SET { sets } WHERE { pw } ' , args )
535522
536523# %% ../nbs/00_core.ipynb #27ebb35c
537524@patch
538525async def inserts (self :Table , records ):
539- "Insert multiple rows in one query and return them "
540- if not records : return []
526+ "Insert multiple rows"
527+ if not records : return
541528 rows = [_process_row (r , self .xtra_id ) for r in records ]
542- cols = list (rows [0 ].keys ())
543- args , vals = _vals_sql (rows , [self .cols [c ] for c in cols ])
529+ cols , args = _norm_rows (rows )
544530 qcols = ', ' .join (f'"{ c } "' for c in cols )
545- sql = f'INSERT INTO { self } ( { qcols } ) VALUES { vals } RETURNING *'
546- return await self ._recs ( sql , * args )
531+ vals = ', ' . join ( f'$ { i + 1 } ' for i in range ( len ( cols )))
532+ await self .db . executemany ( f'INSERT INTO { self } ( { qcols } ) VALUES ( { vals } )' , args )
547533
548534# %% ../nbs/00_core.ipynb #f9968e6f
549535@patch
@@ -626,6 +612,38 @@ async def upsert(self:Table, record=None, **kwargs):
626612 updates = ', ' .join (f'"{ k } "=EXCLUDED."{ k } "' for k in row if k != pk )
627613 return await self ._rec (f'INSERT INTO { self } ({ cols } ) VALUES ({ vals } ) ON CONFLICT ("{ pk } ") DO UPDATE SET { updates } RETURNING *' , * row .values ())
628614
615+ # %% ../nbs/00_core.ipynb #9abd1d88
616+ @patch
617+ async def upserts (self :Table , records ):
618+ "Insert or update multiple rows"
619+ if not records : return
620+ pk = self .pks [0 ]
621+ proc = [_process_row (r , self .xtra_id ) for r in records ]
622+ has_pk = [r for r in proc if r .get (pk ) is not None ]
623+ no_pk = [{k :v for k ,v in r .items () if k != pk } for r in proc if r .get (pk ) is None ]
624+ if has_pk :
625+ cols , args = _norm_rows (has_pk )
626+ qcols = ', ' .join (f'"{ c } "' for c in cols )
627+ vals = ', ' .join (f'${ i + 1 } ' for i in range (len (cols )))
628+ upd = ', ' .join (f'"{ c } "=EXCLUDED."{ c } "' for c in cols if c != pk )
629+ await self .db .executemany (f'INSERT INTO { self } ({ qcols } ) VALUES ({ vals } ) ON CONFLICT ("{ pk } ") DO UPDATE SET { upd } ' , args )
630+ if no_pk : await self .inserts (no_pk )
631+
632+ # %% ../nbs/00_core.ipynb #4962e324
633+ @patch
634+ async def groupby (self :Table , groupflds :list [str ], aggs :list [str ]):
635+ "Dict of `groupflds` to `aggs` results via GROUP BY"
636+ gcols ,acols = ',' .join (f'"{ g } "' for g in groupflds ), ',' .join (aggs )
637+ where , args = _add_xtra (self , None , [])
638+ sql = f"SELECT { gcols } ,{ acols } FROM { self } "
639+ if where : sql += f" WHERE { where } "
640+ sql += f" GROUP BY { gcols } "
641+ rows = await self .db .q (sql , * args )
642+ ng = len (groupflds )
643+ def _key (r ): return r [0 ] if ng == 1 else tuple (r [i ] for i in range (ng ))
644+ def _val (r ): return r [ng ] if len (aggs )== 1 else tuple (r [ng + i ] for i in range (len (aggs )))
645+ return {_key (r ): _val (r ) for r in rows }
646+
629647# %% ../nbs/00_core.ipynb #6be00990
630648async def create_pool (* args , ** kwargs ):
631649 kwargs .setdefault ('record_class' , FRecord )
0 commit comments