-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcaa_importer.py
More file actions
executable file
·352 lines (289 loc) · 14.2 KB
/
caa_importer.py
File metadata and controls
executable file
·352 lines (289 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
#!/usr/bin/env python3
#
# This module imports data from a PostgreSQL database into a local SQLite data store
# using a persistent connection and batched processing for large datasets.
# The importer has been updated to match the new datastore schema and now uses
# python-dotenv for environment variable management.
#
# Before running, ensure you have the required libraries installed:
# pip install peewee psycopg2-binary tqdm click python-dotenv requests
#
# You must also ensure that the 'store.py' file is in the same directory.
import logging
import os
import sys
import time
import click
import psycopg2
from dotenv import load_dotenv
from store import CAABackupDataStore, CoverStatus
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
# Log progress every N seconds
IMPORT_PROGRESS_INTERVAL = 10
# -----------------------------------------------------------------------------
# The main class for the import project.
# -----------------------------------------------------------------------------
class CAAImporter:
"""
A class to handle importing data from a PostgreSQL database into
a local SQLite data store.
"""
def __init__(self, pg_conn_string: str, db_path: str, batch_size: int = 20000):
"""
Initializes the importer with a PostgreSQL connection string.
Args:
pg_conn_string (str): The connection string for the PostgreSQL DB.
db_path (str): The path to the local SQLite database file for the datastore.
batch_size (int): The number of records to fetch per batch.
"""
self.pg_conn_string = pg_conn_string
self.batch_size = batch_size
self.datastore = CAABackupDataStore(db_path=db_path)
self.pg_conn = None
def connect_to_postgres(self):
"""
Establishes a connection to the PostgreSQL database.
Returns the connection object if successful, otherwise None.
"""
try:
self.pg_conn = psycopg2.connect(self.pg_conn_string)
return self.pg_conn
except psycopg2.Error as e:
logging.error(f"PostgreSQL connection error: {e}")
return None
def get_caa_records(self, cursor: psycopg2.extensions.cursor, include_date=False):
"""
Fetches a batch of CAA records from the PostgreSQL query result.
Args:
cursor (psycopg2.extensions.cursor): The cursor to fetch records from.
include_date (bool): If True, expects a 4th column (date_uploaded).
Returns:
list: A list of dictionaries representing database records.
"""
records_tuples = cursor.fetchmany(self.batch_size)
if not records_tuples:
return []
records_dict = []
for row in records_tuples:
record = {
"caa_id": row[0],
"release_mbid": row[1],
"mime_type": row[2],
"status": CoverStatus.NOT_DOWNLOADED,
}
if include_date:
record["date_uploaded"] = row[3]
records_dict.append(record)
return records_dict
def run_import(self):
"""
Connects to a PostgreSQL database, queries data, and imports it
into the CAABackupDataStore in batches with a progress bar.
"""
logging.info("Starting import process...")
# Initialize the data store's table
self.datastore.create()
# Connect to PostgreSQL once
if not self.connect_to_postgres():
logging.error("Import failed due to database connection error.")
return
try:
# Use a cursor to get the total count for the progress bar
with self.pg_conn.cursor() as cursor:
count_query = """SELECT count(*)
FROM cover_art_archive.cover_art caa
JOIN musicbrainz.release r
ON caa.release = r.id"""
cursor.execute(count_query)
total_records = cursor.fetchone()[0]
# Use a new cursor for the main data query
with self.pg_conn.cursor() as cursor:
# The main query to fetch the records, now including mime_type
data_query = """SELECT caa.id, r.gid, caa.mime_type
FROM cover_art_archive.cover_art caa
JOIN musicbrainz.release r
ON caa.release = r.id
ORDER BY r.gid"""
cursor.execute(data_query)
# Open the datastore connection once for the entire import process.
with self.datastore:
total_imported = 0
start_time = time.time()
last_log = start_time
while True:
# Fetch a batch of records from the cursor
records = self.get_caa_records(cursor)
if not records:
break
# Use the datastore's `bulk_add` function
self.datastore.bulk_add(records)
total_imported += len(records)
now = time.time()
if now - last_log >= IMPORT_PROGRESS_INTERVAL:
logging.info(f"Imported: {total_imported} / {total_records}")
last_log = now
logging.info(f"Import process complete. Total records imported: {total_imported}")
# After import, update the import timestamp using the latest date_uploaded from Postgres
latest_ts = self.datastore.fetch_latest_date_uploaded(self.pg_conn)
if latest_ts:
self.datastore.update_import_timestamp(latest_ts)
logging.info(f"Updated import timestamp to: {latest_ts}")
else:
logging.warning("Could not fetch latest date_uploaded from Postgres.")
# After successful import, create unique index on caa_id for optimal query performance
logging.info("Creating unique index on caa_id for optimal query performance...")
from store import CAABackup
with self.datastore:
CAABackup.create_caa_id_index()
except psycopg2.Error as e:
logging.error(f"PostgreSQL query error: {e}")
finally:
if self.pg_conn:
self.pg_conn.close()
def run_import_incremental(self):
"""
Connects to a PostgreSQL database and imports only new records based on
the date_uploaded timestamp. Updates the local timestamp after successful import.
"""
logging.info("Starting incremental import process...")
# Initialize the data store's table
self.datastore.create()
# Connect to PostgreSQL once
if not self.connect_to_postgres():
logging.error("Incremental import failed due to database connection error.")
return
try:
with self.datastore:
# Get the last import timestamp
last_import_date = self.datastore.get_last_import_timestamp()
if last_import_date:
logging.info(f"Last import was at: {last_import_date}")
else:
logging.info("No previous import found, importing all records...")
# Build the query with date filter if we have a last import date
if last_import_date:
count_query = """SELECT count(*)
FROM cover_art_archive.cover_art caa
JOIN musicbrainz.release r
ON caa.release = r.id
WHERE caa.date_uploaded > %s"""
data_query = """SELECT caa.id, r.gid, caa.mime_type, caa.date_uploaded
FROM cover_art_archive.cover_art caa
JOIN musicbrainz.release r
ON caa.release = r.id
WHERE caa.date_uploaded > %s
ORDER BY caa.date_uploaded"""
query_params = (last_import_date,)
else:
# If no previous import, fetch all records
count_query = """SELECT count(*)
FROM cover_art_archive.cover_art caa
JOIN musicbrainz.release r
ON caa.release = r.id"""
data_query = """SELECT caa.id, r.gid, caa.mime_type, caa.date_uploaded
FROM cover_art_archive.cover_art caa
JOIN musicbrainz.release r
ON caa.release = r.id
ORDER BY caa.date_uploaded"""
query_params = ()
# Use a cursor to get the total count for the progress bar
with self.pg_conn.cursor() as cursor:
cursor.execute(count_query, query_params)
total_records = cursor.fetchone()[0]
if total_records == 0:
logging.info("No new records found to import.")
return
logging.info(f"Found {total_records:,} new records to import.")
# Use a new cursor for the main data query
with self.pg_conn.cursor() as cursor:
cursor.execute(data_query, query_params)
total_imported = 0
latest_date_uploaded = None
start_time = time.time()
last_log = start_time
while True:
# Fetch a batch of records from the cursor
records = self.get_caa_records(cursor, include_date=True)
if not records:
break
# Track the latest date_uploaded for updating our timestamp
for record in records:
if record["date_uploaded"]:
if latest_date_uploaded is None or record["date_uploaded"] > latest_date_uploaded:
latest_date_uploaded = record["date_uploaded"]
# Use the datastore's `bulk_add` function
try:
self.datastore.bulk_add(records)
total_imported += len(records)
except Exception as e:
logging.error(f"Error adding batch: {e}")
# Continue with next batch instead of stopping
now = time.time()
if now - last_log >= IMPORT_PROGRESS_INTERVAL:
logging.info(f"Imported: {total_imported} / {total_records}")
last_log = now
# Always update the timestamp after a successful query,
# even if no records were inserted (e.g. all duplicates).
# This prevents re-querying the same records every cycle.
latest_ts = self.datastore.fetch_latest_date_uploaded(self.pg_conn)
if latest_ts:
self.datastore.update_import_timestamp(latest_ts)
logging.info(f"Updated import timestamp to: {latest_ts}")
else:
logging.warning("Could not fetch latest date_uploaded from Postgres.")
logging.info(f"Incremental import complete. New records imported: {total_imported}")
except psycopg2.Error as e:
logging.error(f"PostgreSQL query error: {e}")
finally:
if self.pg_conn:
self.pg_conn.close()
# -----------------------------------------------------------------------------
# Main entry point for the script
# -----------------------------------------------------------------------------
@click.command()
@click.option("--incremental", is_flag=True, help="Run incremental import (fetch only new records since last import)")
def main(incremental):
"""
Script to import data from a PostgreSQL database into a local SQLite datastore.
Configuration is read from a .env file.
Use --incremental flag to import only records uploaded since the last import.
"""
# Load environment variables from a .env file
load_dotenv()
pg_conn_string = os.getenv("PG_CONN_STRING")
db_path = os.getenv("DB_PATH")
logging.info("CAA importer config:")
logging.info(" pg conn: '%s'" % pg_conn_string)
logging.info(" db path: '%s'" % db_path)
logging.info(" incremental: %s" % incremental)
# For full import, check that database doesn't exist
if not incremental and os.path.exists(db_path):
logging.error(
"The DB file %s exists. Please remove it before running this command, or use --incremental flag." % db_path
)
sys.exit(-1)
# For incremental import, database should exist
if incremental and not os.path.exists(db_path):
logging.error("Database file %s not found. Run a full import first (without --incremental flag)." % db_path)
sys.exit(-1)
try:
import consul_config
if hasattr(consul_config, "PG_CONN_STRING") and consul_config.PG_CONN_STRING:
pg_conn_string = consul_config.PG_CONN_STRING
logging.info("pg conn string: '%s'" % pg_conn_string)
except ImportError:
pass
# Ensure environment variables are set
if not pg_conn_string:
click.echo("Error: PG_CONN_STRING environment variable is not set.", err=True)
return
if not db_path:
click.echo("Error: DB_PATH environment variable is not set.", err=True)
return
importer = CAAImporter(pg_conn_string=pg_conn_string, db_path=db_path, batch_size=1000)
if incremental:
importer.run_import_incremental()
else:
importer.run_import()
if __name__ == "__main__":
main()