Feat/cal itp import#1670
Open
ianktc wants to merge 9 commits into
Open
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
Adds a new Cal-ITP data import pipeline to the tasks executor, including the import implementation, CKAN query, tests, and a scheduled monthly execution in GCP.
Changes:
- Introduces Cal-ITP import handler + CKAN SQL query for retrieving feed records.
- Registers the new
cal_itp_importtask in the tasks executor and adds unit/e2e tests. - Adds a monthly Cloud Scheduler job to invoke the Cal-ITP import task.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| infra/functions-python/main.tf | Adds a monthly Cloud Scheduler job to call the tasks executor with cal_itp_import. |
| functions-python/tasks_executor/src/main.py | Registers the new cal_itp_import task and handler. |
| functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.py | Implements Cal-ITP dataset retrieval, filtering, upsert logic, and orchestration/commit hooks. |
| functions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sql | Provides the CKAN datastore SQL used to retrieve Cal-ITP feed records. |
| functions-python/tasks_executor/tests/tasks/data_import/test_cal_itp_import.py | Adds helper/unit tests and an end-to-end DB test for the Cal-ITP import flow. |
davidgamez
reviewed
May 12, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary:
Closes #1642
This pull request introduces support for importing data from Cal-ITP into the system involving an import handler and its tests.
Cal-ITP Import Feature:
functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.pycontains import handler and associated import logicfunctions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sqlis the CKAN API SQL query to retrieve feeds from Cal-ITPfunctions-python/tasks_executor/tests/tasks/data_import/cal_itp/test_cal_itp_import.pycontains the associated unit and e2e testsinfra/functions-python/main.tfincludes the Google Cloud Scheduler job to run the import monthlyfunctions-python/tasks_executor/src/main.pyincludes the handler to the task listOut of scope:
Redirecting MDB feeds to new Cal-ITP: the redirect and csv defining redirect links will be included in follow up PR
Include licensing for Cal-ITP feeds (follow up PR after confirmation with Cal-ITP)
Cal-ITP Import — Execution Flow & Design Doc
Overview
The Cal-ITP import pipeline fetches GTFS schedule and real-time feeds from the California Integrated Travel Project (Cal-ITP) CKAN API and upserts them into the Mobility Feed API database. It runs as a scheduled HTTP Cloud Function (tasks_executor), triggered monthly by Cloud Scheduler, and fans out to dataset download and web revalidation tasks on completion.
Architecture Diagram
Step-by-Step Execution Flow
1. Cloud Scheduler Trigger
Terraform resource:
google_cloud_scheduler_job.cal_itp_import_schedule(
infra/functions-python/main.tf~line 564)0 0 3 * *— 3 AM UTC, monthly (1st of each month)tasks_executorCloud Function URLfunctions_service_account{"task": "cal_itp_import", "payload": {"dry_run": false}}2. Cloud Function — tasks_executor
Terraform resource:
google_cloudfunctions2_function.tasks_executor(
infra/functions-python/main.tf~line 1090)tasks_executor(inmain.py)FEEDS_DATABASE_URL,FEEDS_CREDENTIALS,WEB_APP_REVALIDATE_SECRETKey env vars set by Terraform:
DATASET_PROCESSING_TOPIC_NAME→datasets-batch-topic-{env}(Pub/Sub)WEB_REVALIDATION_QUEUE→ Cloud Tasks queue nameWEB_APP_REVALIDATE_URL→ web app revalidation endpointPROJECT_ID,ENVIRONMENT,SERVICE_ACCOUNT_EMAIL3. HTTP Router —
main.py:tasks_executor()The function parses
request.get_json()for a"task"key and dispatches to the registered handler:For unknown tasks → HTTP 400. For handler exceptions → HTTP 500.
4.
import_cal_itp_handler(payload)File:
import_cal_itp_feeds.pydry_runfrom payload (default:True)_import_cal_itp(dry_run=dry_run){ "message": "Cal-ITP import executed successfully.", "created_gtfs": 12, "updated_gtfs": 5, "created_rt": 8, "total_processed_items": 120, "params": {"dry_run": false} }5.
_import_cal_itp(db_session, dry_run)— OrchestratorDecorated with
@with_db_session(manages SQLAlchemy session lifecycle).Batch size is controlled by
COMMIT_BATCH_SIZEenv var (default: 5).6. Data Fetching —
_fetch_cal_itp_datasets()https://data.ca.gov/api/3/action/datastore_search_sql?sql=<encoded>ckan_query.sql— joins 4 CKAN datasets:gtfs_datasetse4ca5bd4-...servicesdbacfa9f-...provider_gtfs_dataebe116fb-...organizations677e1271-...is_public = 'Yes'AND at least one feed URL present7. Record Filtering —
_filter_cal_itp_records()Records are grouped by
service_source_record_id. For each group:Bay Area 511 services (detected by "Bay Area 511 Regional" in any name column):
Regional Precursor Feed(preferred)Regional SubfeedCombined Regional FeedAll other services:
gtfs_service_data_customer_facing == true/yes/18. Per-Dataset Processing —
_process_cal_itp_dataset()For each filtered dataset record:
a. Resource Expansion
Expand one dataset dict into 1–4 resource dicts:
schedule_dataset_urlpresent)"entity_type": ["{rt_type}"](a single-element list)Resources are sorted: schedule first, then RT feeds.
b. Validation —
_validate_required_cal_itp_fields()For each resource, validates required fields exist and are non-empty:
schedule_source_record_id,schedule_gtfs_dataset_name,schedule_dataset_url{type}_source_record_id,{type}_gtfs_dataset_name,{type}_dataset_urlRaises
InvalidCalItpFeedErroron failure; resource is skipped.c. Stable ID Generation
Type codes:
s(schedule),tu(trip updates),vp(vehicle positions),sa(service alerts)d. Location Mapping —
_get_cal_itp_locations()Maps
caltrans_district_name→LocationDB row:United States(hardcoded)California(hardcoded)caltrans_district_namee. GTFS Schedule Feed Processing
_probe_head_format()— verify URL returns a ZIP_delete_and_recreate_feed_if_type_changed()— handles type conflicts (delete + flush + recreate)(stable_id, feed_name, provider, producer_url)feed_name,provider,producer_url,operational_status,locations_ensure_cal_itp_external_id()— ensureExternalidrow exists for this feedf. GTFS-RT Feed Processing
_get_entity_types_from_resource()— maps RT type string to entity type codesENTITY_TYPES_MAP:{"trip_updates": "tu", "vehicle_positions": "vp", "service_alerts": "sa"}get_or_create_entity_type(db_session, et)— upsertsEntitytyperowsstatic_current_feedreferencestatic_refsandentity_typesg. Error Handling Per Resource
IntegrityError→ rollback to savepoint, log, continueException→ rollback to savepoint, log, continue9. Stale Feed Deprecation —
_deprecate_stale_feeds()After all datasets are processed:
Feedrows wherestable_id LIKE 'cal_itp-%'processed_stable_ids→ setstatus = "deprecated"10. Commit & Downstream Triggers —
commit_changes()On
IntegrityError: rollback, log, re-raise (propagates to caller).11. Dry Run Mode
When
dry_run=True(the default when called without a payload):Key Design Decisions
cal_itp-{id}-{type})Entity Types Map
trip_updatestuvehicle_positionsvpservice_alertssa