diff --git a/docs/source/guide/storage.md b/docs/source/guide/storage.md index a6263f117b6f..6cf994689763 100644 --- a/docs/source/guide/storage.md +++ b/docs/source/guide/storage.md @@ -6,7 +6,7 @@ tier: all order: 151 order_enterprise: 151 meta_title: Cloud and External Storage Integration -meta_description: "Label Studio Documentation for integrating Amazon AWS S3, Google Cloud Storage, Microsoft Azure, Redis, and local file directories with Label Studio." +meta_description: "Label Studio Documentation for integrating Amazon AWS S3, Google Cloud Storage, Microsoft Azure, Backblaze B2, Redis, and local file directories with Label Studio." section: "Import & Export" --- @@ -23,6 +23,7 @@ Integrate popular cloud and external storage systems with Label Studio to collec | [Google Cloud Storage WIF Auth](https://docs.humansignal.com/guide/storage#Google-Cloud-Storage-with-Workload-Identity-Federation-WIF) | ❌ | ✅ | | [Microsoft Azure Blob Storage](#Microsoft-Azure-Blob-storage) | ✅ | ✅ | | [Microsoft Azure Blob Storage with Service Principal](https://docs.humansignal.com/guide/storage#Azure-Blob-Storage-with-Service-Principal-authentication) | ❌ | ✅ | +| [Backblaze B2](#Backblaze-B2) | ✅ | ✅ | | [Databricks Files (UC Volumes)](https://docs.humansignal.com/guide/storage#Databricks-Files-UC-Volumes) | ❌ | ✅ | | [Redis database](#Redis-database)| ✅ | ✅ | | [Local storage](#Local-storage) | ✅ | ✅ | @@ -39,6 +40,7 @@ Integrate popular cloud and external storage systems with Label Studio to collec | [Google Cloud Storage WIF Auth](#Google-Cloud-Storage-with-Workload-Identity-Federation-WIF) | ❌ | ✅ | | [Microsoft Azure Blob Storage](#Microsoft-Azure-Blob-storage) | ✅ | ✅ | | [Microsoft Azure Blob Storage with Service Principal](#Azure-Blob-Storage-with-Service-Principal-authentication) | ❌ | ✅ | +| [Backblaze B2](#Backblaze-B2) | ✅ | ✅ | | [Databricks Files (UC Volumes)](#Databricks-Files-UC-Volumes) | ❌ | ✅ | | [Redis database](#Redis-database)| ✅ | ✅ | | [Local storage](#Local-storage) (on-prem only) | ✅ | ✅ | @@ -1348,6 +1350,138 @@ These are included in the built-in **Storage Blob Data Contributor** role. +## Backblaze B2 + +Connect your [Backblaze B2](https://www.backblaze.com/cloud-storage) bucket to Label Studio to retrieve labeling tasks or store completed annotations. Backblaze B2 provides S3-compatible object storage with predictable pricing and no egress fees. + +For details about how Label Studio secures access to cloud storage, see [Secure access to cloud storage](security.html#Secure-access-to-cloud-storage). + +### Prerequisites + +Before you set up your Backblaze B2 bucket with Label Studio, you need: + +1. A Backblaze B2 account +2. An Application Key with appropriate permissions +3. Your B2 bucket name and endpoint URL + +### Configure access to your Backblaze B2 bucket + +1. **Create an Application Key:** + - Log in to your Backblaze account + - Navigate to **App Keys** in the left sidebar + - Click **Add a New Application Key** + - Set a name for your key (e.g., "Label Studio") + - Choose the bucket you want to use (or "All" for all buckets) + - Select capabilities: + - For **Source storage**: Enable `listBuckets`, `listFiles`, and `readFiles` + - For **Target storage**: Add `writeFiles` and optionally `deleteFiles` (if you want to sync deletions) + - Click **Create New Key** + - **Important**: Copy both the **Application Key ID** and **Application Key** immediately - the secret key is only shown once + +2. **Get your S3-compatible endpoint URL:** + - Backblaze B2 provides S3-compatible endpoints in the format: + ``` + https://s3..backblazeb2.com + ``` + - Common regions: + - `us-west-004` (US West) + - `us-west-002` (US West - Phoenix) + - `us-east-005` (US East) + - `eu-central-003` (EU Central - Amsterdam) + - You can find your region in the Backblaze B2 bucket details + +3. **Set up CORS (for browser access to media files):** + - In Backblaze B2, navigate to your bucket settings + - Click **Bucket Settings** → **CORS Rules** + - Add the following CORS rule: + + ```json + [ + { + "allowedOrigins": [ + "https://your-label-studio-domain.com" + ], + "allowedHeaders": [ + "*" + ], + "allowedOperations": [ + "s3_get" + ], + "maxAgeSeconds": 3600 + } + ] + ``` + + Replace `https://your-label-studio-domain.com` with your Label Studio URL. For local development, you can use `http://localhost:8080`. + +### Add Backblaze B2 as source storage + +1. In the Label Studio UI, open a project. +2. Go to **Settings > Cloud Storage**. +3. Click **Add Source Storage**. +4. Select **Backblaze B2** from the storage type dropdown. +5. Enter the following: + - **Bucket Name**: Your B2 bucket name + - **Endpoint URL**: Your S3-compatible endpoint (e.g., `https://s3.us-west-004.backblazeb2.com`) + - **Application Key ID**: The Key ID from step 1 + - **Application Key**: The secret Application Key from step 1 + - **Region Name** (optional): The region code (e.g., `us-west-004`) + - **Bucket Prefix** (optional): Specify a folder path to import files from a specific subfolder + - **File Filter Regex** (optional): Filter files by name pattern + - **Import Method**: + - Choose **Files** to automatically create tasks from each file + - Choose **Tasks** to import JSON/JSONL files as task definitions + - **Use pre-signed URLs**: Toggle on to use presigned URLs (recommended) + - **Presigned URL TTL**: Time in minutes before URLs expire (default: 15 minutes) +6. Click **Test Connection** to verify the settings. +7. Click **Add Storage**. +8. Click **Sync Storage** to import tasks from your B2 bucket. + +### Add Backblaze B2 as target storage + +1. In the Label Studio UI, open a project. +2. Go to **Settings > Cloud Storage**. +3. Click **Add Target Storage**. +4. Select **Backblaze B2** from the storage type dropdown. +5. Enter the following: + - **Bucket Name**: Your B2 bucket name + - **Endpoint URL**: Your S3-compatible endpoint (e.g., `https://s3.us-west-004.backblazeb2.com`) + - **Application Key ID**: The Key ID with write permissions + - **Application Key**: The secret Application Key with write permissions + - **Region Name** (optional): The region code (e.g., `us-west-004`) + - **Bucket Prefix** (optional): Specify a folder path for exported annotations + - **Can delete objects**: Toggle on if you want deletions in Label Studio to sync to B2 +6. Click **Test Connection** to verify the settings. +7. Click **Add Storage**. + +Annotations are exported to B2 automatically when you create or update them. + +### Troubleshooting Backblaze B2 + +If you experience issues with Backblaze B2 storage: + +- **Connection test fails**: + - Verify your Application Key ID and Application Key are correct + - Ensure the Application Key has the required capabilities for your use case + - Check that the endpoint URL matches your bucket's region + +- **Files not appearing**: + - Verify the bucket name is spelled correctly (case-sensitive) + - Check that your bucket prefix matches the actual folder structure + - Ensure your Application Key has `listFiles` capability + +- **Cannot access media files**: + - Verify CORS is configured correctly for your Label Studio domain + - If using presigned URLs, ensure the Application Key has `shareFiles` capability + - Try toggling "Use pre-signed URLs" off to use proxy mode instead + +- **Annotations not exporting**: + - Verify the Application Key has `writeFiles` capability + - Check that the target storage is configured (not just source storage) + - Look for export errors in the Label Studio logs + +For additional support, consult the [Backblaze B2 documentation](https://www.backblaze.com/docs/cloud-storage) or contact Backblaze support. + ## Redis database You can also store your tasks and annotations in a [Redis database](https://redis.io/). You must store the tasks and annotations in different databases. You might want to use a Redis database if you find that relying on a file-based cloud storage connection is slow for your datasets. diff --git a/label_studio/core/settings/base.py b/label_studio/core/settings/base.py index effefd68a400..7f375f75021a 100644 --- a/label_studio/core/settings/base.py +++ b/label_studio/core/settings/base.py @@ -788,6 +788,15 @@ def collect_versions_dummy(**kwargs): ], ) +# Custom B2 endpoints on these domains will get detailed error reporting +B2_TRUSTED_STORAGE_DOMAINS = get_env_list( + 'B2_TRUSTED_STORAGE_DOMAINS', + [ + 'backblazeb2.com', + 'backblaze.com', + ], +) + REAL_HOSTNAME = os.getenv('HOSTNAME') # we have to use getenv, because we don't use LABEL_STUDIO_ prefix GCS_CLOUD_STORAGE_FORCE_DEFAULT_CREDENTIALS = get_bool_env('GCS_CLOUD_STORAGE_FORCE_DEFAULT_CREDENTIALS', False) PUBLIC_API_DOCS = get_bool_env('PUBLIC_API_DOCS', False) diff --git a/label_studio/io_storages/b2/__init__.py b/label_studio/io_storages/b2/__init__.py new file mode 100644 index 000000000000..31ab45f5acec --- /dev/null +++ b/label_studio/io_storages/b2/__init__.py @@ -0,0 +1,3 @@ +"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. +""" + diff --git a/label_studio/io_storages/b2/api.py b/label_studio/io_storages/b2/api.py new file mode 100644 index 000000000000..d9ee5b76584a --- /dev/null +++ b/label_studio/io_storages/b2/api.py @@ -0,0 +1,321 @@ +"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. +""" +from django.utils.decorators import method_decorator +from drf_spectacular.types import OpenApiTypes +from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema +from io_storages.api import ( + ExportStorageDetailAPI, + ExportStorageFormLayoutAPI, + ExportStorageListAPI, + ExportStorageSyncAPI, + ExportStorageValidateAPI, + ImportStorageDetailAPI, + ImportStorageFormLayoutAPI, + ImportStorageListAPI, + ImportStorageSyncAPI, + ImportStorageValidateAPI, +) +from io_storages.b2.models import B2ExportStorage, B2ImportStorage +from io_storages.b2.serializers import B2ExportStorageSerializer, B2ImportStorageSerializer + +from .openapi_schema import ( + _b2_export_storage_schema, + _b2_export_storage_schema_with_id, + _b2_import_storage_schema, + _b2_import_storage_schema_with_id, +) + + +@method_decorator( + name='get', + decorator=extend_schema( + tags=['Storage: B2'], + summary='List B2 import storage', + description='Get a list of all Backblaze B2 import storage connections.', + parameters=[ + OpenApiParameter( + name='project', + type=OpenApiTypes.INT, + location='query', + description='Project ID', + ), + ], + extensions={ + 'x-fern-sdk-group-name': ['import_storage', 'b2'], + 'x-fern-sdk-method-name': 'list', + 'x-fern-audiences': ['public'], + }, + ), +) +@method_decorator( + name='post', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Create new B2 import storage', + description='Create new Backblaze B2 import storage connection', + request={ + 'application/json': _b2_import_storage_schema, + }, + extensions={ + 'x-fern-sdk-group-name': ['import_storage', 'b2'], + 'x-fern-sdk-method-name': 'create', + 'x-fern-audiences': ['public'], + }, + ), +) +class B2ImportStorageListAPI(ImportStorageListAPI): + """API for listing and creating B2 import storage connections.""" + + queryset = B2ImportStorage.objects.all() + serializer_class = B2ImportStorageSerializer + + +@method_decorator( + name='get', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Get B2 import storage', + description='Get a specific Backblaze B2 import storage connection.', + request=None, + extensions={ + 'x-fern-sdk-group-name': ['import_storage', 'b2'], + 'x-fern-sdk-method-name': 'get', + 'x-fern-audiences': ['public'], + }, + ), +) +@method_decorator( + name='patch', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Update B2 import storage', + description='Update a specific Backblaze B2 import storage connection.', + request={ + 'application/json': _b2_import_storage_schema, + }, + extensions={ + 'x-fern-sdk-group-name': ['import_storage', 'b2'], + 'x-fern-sdk-method-name': 'update', + 'x-fern-audiences': ['public'], + }, + ), +) +@method_decorator( + name='delete', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Delete B2 import storage', + description='Delete a specific Backblaze B2 import storage connection.', + request=None, + extensions={ + 'x-fern-sdk-group-name': ['import_storage', 'b2'], + 'x-fern-sdk-method-name': 'delete', + 'x-fern-audiences': ['public'], + }, + ), +) +class B2ImportStorageDetailAPI(ImportStorageDetailAPI): + """API for retrieving, updating, and deleting a specific B2 import storage.""" + + queryset = B2ImportStorage.objects.all() + serializer_class = B2ImportStorageSerializer + + +@method_decorator( + name='post', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Sync B2 import storage', + description='Sync tasks from a Backblaze B2 import storage connection.', + parameters=[ + OpenApiParameter( + name='id', + type=OpenApiTypes.INT, + location='path', + description='Storage ID', + ), + ], + request=None, + extensions={ + 'x-fern-sdk-group-name': ['import_storage', 'b2'], + 'x-fern-sdk-method-name': 'sync', + 'x-fern-audiences': ['public'], + }, + ), +) +class B2ImportStorageSyncAPI(ImportStorageSyncAPI): + """API for syncing a B2 import storage.""" + + serializer_class = B2ImportStorageSerializer + + +@method_decorator( + name='post', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Validate B2 import storage', + description='Validate a specific Backblaze B2 import storage connection.', + request={ + 'application/json': _b2_import_storage_schema_with_id, + }, + responses={200: OpenApiResponse(description='Validation successful')}, + extensions={ + 'x-fern-sdk-group-name': ['import_storage', 'b2'], + 'x-fern-sdk-method-name': 'validate', + 'x-fern-audiences': ['public'], + }, + ), +) +class B2ImportStorageValidateAPI(ImportStorageValidateAPI): + """API for validating a B2 import storage connection.""" + + serializer_class = B2ImportStorageSerializer + + +@method_decorator( + name='post', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Validate B2 export storage', + description='Validate a specific Backblaze B2 export storage connection.', + request={ + 'application/json': _b2_export_storage_schema_with_id, + }, + responses={200: OpenApiResponse(description='Validation successful')}, + extensions={ + 'x-fern-sdk-group-name': ['export_storage', 'b2'], + 'x-fern-sdk-method-name': 'validate', + 'x-fern-audiences': ['public'], + }, + ), +) +class B2ExportStorageValidateAPI(ExportStorageValidateAPI): + """API for validating a B2 export storage connection.""" + + serializer_class = B2ExportStorageSerializer + + +@method_decorator( + name='get', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Get all B2 export storage', + description='Get a list of all Backblaze B2 export storage connections.', + parameters=[ + OpenApiParameter( + name='project', + type=OpenApiTypes.INT, + location='query', + description='Project ID', + ), + ], + extensions={ + 'x-fern-sdk-group-name': ['export_storage', 'b2'], + 'x-fern-sdk-method-name': 'list', + 'x-fern-audiences': ['public'], + }, + ), +) +@method_decorator( + name='post', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Create B2 export storage', + description='Create a new Backblaze B2 export storage connection to store annotations.', + request={ + 'application/json': _b2_export_storage_schema, + }, + extensions={ + 'x-fern-sdk-group-name': ['export_storage', 'b2'], + 'x-fern-sdk-method-name': 'create', + 'x-fern-audiences': ['public'], + }, + ), +) +class B2ExportStorageListAPI(ExportStorageListAPI): + """API for listing and creating B2 export storage connections.""" + + queryset = B2ExportStorage.objects.all() + serializer_class = B2ExportStorageSerializer + + +@method_decorator( + name='get', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Get B2 export storage', + description='Get a specific Backblaze B2 export storage connection.', + request=None, + extensions={ + 'x-fern-sdk-group-name': ['export_storage', 'b2'], + 'x-fern-sdk-method-name': 'get', + 'x-fern-audiences': ['public'], + }, + ), +) +@method_decorator( + name='patch', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Update B2 export storage', + description='Update a specific Backblaze B2 export storage connection.', + request={ + 'application/json': _b2_export_storage_schema, + }, + extensions={ + 'x-fern-sdk-group-name': ['export_storage', 'b2'], + 'x-fern-sdk-method-name': 'update', + 'x-fern-audiences': ['public'], + }, + ), +) +@method_decorator( + name='delete', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Delete B2 export storage', + description='Delete a specific Backblaze B2 export storage connection.', + request=None, + extensions={ + 'x-fern-sdk-group-name': ['export_storage', 'b2'], + 'x-fern-sdk-method-name': 'delete', + 'x-fern-audiences': ['public'], + }, + ), +) +class B2ExportStorageDetailAPI(ExportStorageDetailAPI): + """API for retrieving, updating, and deleting a specific B2 export storage.""" + + queryset = B2ExportStorage.objects.all() + serializer_class = B2ExportStorageSerializer + + +@method_decorator( + name='post', + decorator=extend_schema( + tags=['Storage: B2'], + summary='Sync B2 export storage', + description='Sync annotations to a Backblaze B2 export storage connection.', + request=None, + extensions={ + 'x-fern-sdk-group-name': ['export_storage', 'b2'], + 'x-fern-sdk-method-name': 'sync', + 'x-fern-audiences': ['public'], + }, + ), +) +class B2ExportStorageSyncAPI(ExportStorageSyncAPI): + """API for syncing a B2 export storage.""" + + serializer_class = B2ExportStorageSerializer + + +class B2ImportStorageFormLayoutAPI(ImportStorageFormLayoutAPI): + """API for getting the form layout for B2 import storage.""" + pass + + +class B2ExportStorageFormLayoutAPI(ExportStorageFormLayoutAPI): + """API for getting the form layout for B2 export storage.""" + pass + diff --git a/label_studio/io_storages/b2/form_layout.yml b/label_studio/io_storages/b2/form_layout.yml new file mode 100644 index 000000000000..d4700f102ce2 --- /dev/null +++ b/label_studio/io_storages/b2/form_layout.yml @@ -0,0 +1,155 @@ +# Form layout configuration for Backblaze B2 Cloud Storage integration +# This defines the UI fields shown when creating/editing B2 storage connections + +# 1x3 grid - Basic information +title_bucket_prefix: &title_bucket_prefix + - type: text + name: title + label: Storage Name + required: true + - type: text + name: bucket + label: Bucket Name + allowEmpty: false + required: true + - type: text + name: prefix + label: Bucket Prefix (Folder Path) + +# 2x3 grid - B2 specific parameters for import +b2_params_import: &b2_params_import + - type: text + name: region_name + label: Region Name + placeholder: us-west-004 + tooltip: "B2 region (e.g., us-west-004, us-east-005, eu-central-003)" + - type: text + name: b2_endpoint_url + label: B2 Endpoint URL + placeholder: https://s3.us-west-004.backblazeb2.com + tooltip: "Your B2 S3-compatible endpoint URL" + - null + - type: password + name: b2_access_key_id + label: Application Key ID + autoComplete: "off" + skipAutofill: true + allowEmpty: false + protectedValue: true + tooltip: "Your B2 Application Key ID. Leave blank if already set up as an Environment Variable (B2_ACCESS_KEY_ID)." + - type: password + name: b2_secret_access_key + label: Application Key + autoComplete: "new-password" + skipAutofill: true + allowEmpty: false + protectedValue: true + tooltip: "Your B2 Application Key. Leave blank if already set up as an Environment Variable (B2_SECRET_ACCESS_KEY)." + - null + +# 2x3 grid - B2 specific parameters for export +b2_params_export: &b2_params_export + - type: text + name: region_name + label: Region Name + placeholder: us-west-004 + tooltip: "B2 region (e.g., us-west-004, us-east-005, eu-central-003)" + - type: text + name: b2_endpoint_url + label: B2 Endpoint URL + placeholder: https://s3.us-west-004.backblazeb2.com + tooltip: "Your B2 S3-compatible endpoint URL" + - null + - type: password + name: b2_access_key_id + label: Application Key ID + autoComplete: "off" + skipAutofill: true + allowEmpty: false + protectedValue: true + tooltip: "Your B2 Application Key ID" + - type: password + name: b2_secret_access_key + label: Application Key + autoComplete: "new-password" + skipAutofill: true + allowEmpty: false + protectedValue: true + tooltip: "Your B2 Application Key" + - null + + +ImportStorage: + # Title, Bucket, Prefix + - columnCount: 3 + fields: *title_bucket_prefix + # Regex filter + - columnCount: 1 + fields: + - type: text + name: regex_filter + label: File Filter Regex + placeholder: '.*csv or .*(jpe?g|png|tiff) or .\w+-\d+.text' + validators: + - regexp + # B2 specific params + - columnCount: 3 + fields: *b2_params_import + + # Import method selection + - columnCount: 1 + fields: + - type: select + name: use_blob_urls + label: Import method + description: Choose how to import your data from B2 storage + placeholder: "Select an option" + required: true + options: + - value: true + label: "Files - Automatically creates a task for each storage object (e.g. JPG, MP3, TXT)" + - value: false + label: "Tasks - Treat each JSON or JSONL file as a task definition (one or more tasks per file)" + + # 2 column grid - Presigned URLs + - columnCount: 2 + columns: + - width: 468 + fields: + - type: toggle + name: presign + label: "Use pre-signed URLs (On)\n Proxy through the platform (Off)" + description: "When pre-signed URLs are enabled, all data bypasses the platform and user browsers directly read data from B2" + value: true + - fields: + - type: counter + name: presign_ttl + label: Expire pre-signed URLs (minutes) + min: 1 + value: 15 + dependency: presign + # Recursive scan option + - columnCount: 1 + columns: + - fields: + - type: toggle + name: recursive_scan + label: Scan all sub-folders + description: Include files from all nested folders in the bucket + +ExportStorage: + # Title, Bucket, Prefix + - columnCount: 3 + fields: *title_bucket_prefix + # B2 specific params + - columnCount: 3 + fields: *b2_params_export + # Delete objects option + - columnCount: 1 + columns: + - fields: + - type: toggle + name: can_delete_objects + label: Can delete objects from storage + description: If unchecked, annotations will not be deleted from B2 storage when deleted from Label Studio + diff --git a/label_studio/io_storages/b2/models.py b/label_studio/io_storages/b2/models.py new file mode 100644 index 000000000000..8bdbe052a62d --- /dev/null +++ b/label_studio/io_storages/b2/models.py @@ -0,0 +1,460 @@ +"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. +""" + +import json +import logging +import re +from typing import Union +from urllib.parse import urlparse + +import boto3 +from core.redis import start_job_async_or_sync +from django.conf import settings +from django.db import models +from django.db.models.signals import post_save, pre_delete +from django.dispatch import receiver +from django.utils.translation import gettext_lazy as _ +from io_storages.b2.utils import ( + catch_and_reraise_from_none, + get_client_and_resource, + resolve_b2_url, +) +from io_storages.base_models import ( + ExportStorage, + ExportStorageLink, + ImportStorage, + ImportStorageLink, + ProjectStorageMixin, +) +from io_storages.utils import StorageObject, load_tasks_json, storage_can_resolve_bucket_url +from tasks.models import Annotation + +from label_studio.io_storages.b2.utils import B2 + +logger = logging.getLogger(__name__) +logging.getLogger('botocore').setLevel(logging.CRITICAL) +boto3.set_stream_logger(level=logging.INFO) + +# Cache for B2 clients to avoid re-creating them on every request +clients_cache = {} + + +class B2StorageMixin(models.Model): + """ + Mixin for Backblaze B2 Cloud Storage connection settings. + + B2 is S3-compatible, so we use boto3 with custom endpoints. + Unlike AWS S3, B2 requires: + - An explicit endpoint URL (e.g., https://s3.us-west-004.backblazeb2.com) + - Application Key ID and Application Key (equivalent to AWS credentials) + - No special session tokens or SSE KMS keys + """ + + bucket = models.TextField( + _('bucket'), + null=True, + blank=True, + help_text='B2 bucket name' + ) + prefix = models.TextField( + _('prefix'), + null=True, + blank=True, + help_text='B2 bucket prefix (folder path)' + ) + regex_filter = models.TextField( + _('regex_filter'), + null=True, + blank=True, + help_text='Cloud storage regex for filtering objects', + ) + use_blob_urls = models.BooleanField( + _('use_blob_urls'), + default=False, + help_text='Interpret objects as BLOBs and generate URLs', + ) + + # B2-specific credentials + # Note: These are called "Application Key ID" and "Application Key" in B2 UI, + # but we use AWS-compatible naming for boto3 compatibility + b2_access_key_id = models.TextField( + _('b2_access_key_id'), + null=True, + blank=True, + help_text='B2 Application Key ID (equivalent to AWS_ACCESS_KEY_ID)' + ) + b2_secret_access_key = models.TextField( + _('b2_secret_access_key'), + null=True, + blank=True, + help_text='B2 Application Key (equivalent to AWS_SECRET_ACCESS_KEY)', + ) + + # B2-specific endpoint configuration + # B2 uses region-specific endpoints like: https://s3.us-west-004.backblazeb2.com + b2_endpoint_url = models.TextField( + _('b2_endpoint_url'), + null=True, + blank=True, + help_text='B2 S3-compatible endpoint URL (e.g., https://s3.us-west-004.backblazeb2.com)' + ) + region_name = models.TextField( + _('region_name'), + null=True, + blank=True, + help_text='B2 Region (e.g., us-west-004, us-east-005, eu-central-003)' + ) + + @catch_and_reraise_from_none + def get_client_and_resource(self): + """ + Get or create cached boto3 client and resource for B2. + + B2 client initialization takes ~100ms, so we cache clients to avoid + performance issues when processing many tasks. + """ + # Create cache key from connection parameters + cache_key = f'{self.b2_access_key_id}:{self.b2_secret_access_key}:{self.b2_endpoint_url}:{self.region_name}' + if cache_key in clients_cache: + return clients_cache[cache_key] + + # Create new client and resource + result = get_client_and_resource( + self.b2_access_key_id, + self.b2_secret_access_key, + self.b2_endpoint_url, + self.region_name, + ) + clients_cache[cache_key] = result + return result + + def get_client(self): + """Get boto3 client for B2.""" + client, _ = self.get_client_and_resource() + return client + + def get_client_and_bucket(self, validate_connection=True): + """Get boto3 client and bucket resource for B2.""" + client, b2 = self.get_client_and_resource() + if validate_connection: + self.validate_connection(client) + return client, b2.Bucket(self.bucket) + + @catch_and_reraise_from_none + def validate_connection(self, client=None): + """ + Validate connection to B2 bucket. + + For import storage, we check that at least one object exists with the prefix. + For export storage, we only check that the bucket exists (prefix can be empty). + """ + logger.debug('validate_connection') + if client is None: + client = self.get_client() + + # Check if this is an export storage class + is_export = 'Export' in self.__class__.__name__ + + if self.prefix: + logger.debug( + f'[Class {self.__class__.__name__}]: Test connection to B2 bucket {self.bucket} ' + f'with prefix {self.prefix} using ListObjectsV2 operation' + ) + result = client.list_objects_v2(Bucket=self.bucket, Prefix=self.prefix, MaxKeys=1) + # We expect 1 key with the prefix for imports. For exports it's okay if there are 0 with the prefix. + expected_keycount = 0 if is_export else 1 + if (keycount := result.get('KeyCount')) is None or keycount < expected_keycount: + raise KeyError(f'{self.url_scheme}://{self.bucket}/{self.prefix} not found.') + else: + logger.debug( + f'[Class {self.__class__.__name__}]: Test connection to B2 bucket {self.bucket} ' + f'using HeadBucket operation' + ) + client.head_bucket(Bucket=self.bucket) + + @property + def path_full(self): + """Full path to the storage location.""" + prefix = self.prefix or '' + return f'{self.url_scheme}://{self.bucket}/{prefix}' + + @property + def type_full(self): + """Human-readable storage type name.""" + return 'Backblaze B2' + + @catch_and_reraise_from_none + def get_bytes_stream(self, uri, range_header=None): + """ + Get file directly from B2 using iter_chunks without wrapper. + + This method forwards Range headers directly to B2 and returns the raw stream. + Note: The returned stream is NOT seekable and will break if seeking backwards. + + Args: + uri: The B2 URI of the file to retrieve + range_header: Optional HTTP Range header to forward to B2 + + Returns: + Tuple of (stream, content_type, metadata) where metadata contains + important B2 headers like ETag, ContentLength, etc. + """ + # Parse URI to get bucket and key + parsed_uri = urlparse(uri, allow_fragments=False) + bucket_name = parsed_uri.netloc + key = parsed_uri.path.lstrip('/') + + # Get B2 client + client = self.get_client() + + try: + # Forward Range header to B2 if provided + request_params = {'Bucket': bucket_name, 'Key': key} + if range_header: + request_params['Range'] = range_header + + # Get the object from B2 + response = client.get_object(**request_params) + + # Extract metadata to return + metadata = { + 'ETag': response.get('ETag'), + 'ContentLength': response.get('ContentLength'), + 'ContentRange': response.get('ContentRange'), + 'LastModified': response.get('LastModified'), + 'StatusCode': response['ResponseMetadata']['HTTPStatusCode'], + } + + # Return the streaming body directly + return response['Body'], response.get('ContentType'), metadata + + except Exception as e: + logger.error(f'Error getting direct stream from B2 for uri {uri}: {e}', exc_info=True) + return None, None, {} + + class Meta: + abstract = True + + +class B2ImportStorageBase(B2StorageMixin, ImportStorage): + """ + Base class for B2 Import Storage. + + This class provides the core functionality for importing tasks from B2 buckets. + """ + + url_scheme = 'b2' + + presign = models.BooleanField( + _('presign'), + default=True, + help_text='Generate presigned URLs' + ) + presign_ttl = models.PositiveSmallIntegerField( + _('presign_ttl'), + default=1, + help_text='Presigned URLs TTL (in minutes)' + ) + recursive_scan = models.BooleanField( + _('recursive scan'), + default=False, + help_text=_('Perform recursive scan over the bucket content'), + ) + + @catch_and_reraise_from_none + def iter_objects(self): + """ + Iterate over objects in the B2 bucket. + + Yields: + B2 object instances + """ + _, bucket = self.get_client_and_bucket() + list_kwargs = {} + if self.prefix: + list_kwargs['Prefix'] = self.prefix.rstrip('/') + '/' + if not self.recursive_scan: + list_kwargs['Delimiter'] = '/' + bucket_iter = bucket.objects.filter(**list_kwargs).all() + regex = re.compile(str(self.regex_filter)) if self.regex_filter else None + for obj in bucket_iter: + key = obj.key + if key.endswith('/'): + logger.debug(key + ' is skipped because it is a folder') + continue + if regex and not regex.match(key): + logger.debug(key + ' is skipped by regex filter') + continue + logger.debug(f'B2 {key} has passed the regex filter') + yield obj + + @catch_and_reraise_from_none + def iter_keys(self): + """Iterate over object keys in the B2 bucket.""" + for obj in self.iter_objects(): + yield obj.key + + def get_unified_metadata(self, obj): + """Get standardized metadata for an object.""" + return { + 'key': obj.key, + 'last_modified': obj.last_modified, + 'size': obj.size, + } + + @catch_and_reraise_from_none + def scan_and_create_links(self): + """Scan B2 bucket and create task links.""" + return self._scan_and_create_links(B2ImportStorageLink) + + @catch_and_reraise_from_none + def get_data(self, key) -> list[StorageObject]: + """ + Get data from B2 for a given key. + + If use_blob_urls is True, return the B2 URL directly. + Otherwise, read and parse the JSON content. + """ + uri = f'{self.url_scheme}://{self.bucket}/{key}' + if self.use_blob_urls: + data_key = settings.DATA_UNDEFINED_NAME + task = {data_key: uri} + return [StorageObject(key=key, task_data=task)] + + # read task json from bucket and validate it + _, b2 = self.get_client_and_resource() + bucket = b2.Bucket(self.bucket) + obj = b2.Object(bucket.name, key).get()['Body'].read() + return load_tasks_json(obj, key) + + @catch_and_reraise_from_none + def generate_http_url(self, url): + """Generate HTTP URL (presigned or base64) for a B2 URL.""" + return resolve_b2_url(url, self.get_client(), self.presign, expires_in=self.presign_ttl * 60) + + @catch_and_reraise_from_none + def can_resolve_url(self, url: Union[str, None]) -> bool: + """Check if this storage can resolve the given URL.""" + return storage_can_resolve_bucket_url(self, url) + + @catch_and_reraise_from_none + def get_blob_metadata(self, key): + """Get metadata for a blob in B2.""" + return B2.get_blob_metadata( + key, + self.bucket, + b2_access_key_id=self.b2_access_key_id, + b2_secret_access_key=self.b2_secret_access_key, + b2_endpoint_url=self.b2_endpoint_url, + region_name=self.region_name, + ) + + class Meta: + abstract = True + + +class B2ImportStorage(ProjectStorageMixin, B2ImportStorageBase): + """Concrete model for B2 Import Storage.""" + + class Meta: + abstract = False + + +class B2ExportStorage(B2StorageMixin, ExportStorage): + """ + B2 Export Storage for saving annotations. + + This storage saves annotations to a B2 bucket in JSON format. + """ + + @catch_and_reraise_from_none + def save_annotation(self, annotation): + """Save a single annotation to B2.""" + client, b2 = self.get_client_and_resource() + logger.debug(f'Creating new object on {self.__class__.__name__} Storage {self} for annotation {annotation}') + ser_annotation = self._get_serialized_data(annotation) + + # get key that identifies this object in storage + key = B2ExportStorageLink.get_key(annotation) + key = str(self.prefix) + '/' + key if self.prefix else key + + # put object into storage + # Note: B2 doesn't support AWS SSE KMS keys, so we use basic server-side encryption + additional_params = {} + + # B2 supports server-side encryption (AES-256) automatically + # No need to explicitly set it like with AWS + + b2.Object(self.bucket, key).put(Body=json.dumps(ser_annotation), **additional_params) + + # create link if everything ok + B2ExportStorageLink.create(annotation, self) + + @catch_and_reraise_from_none + def delete_annotation(self, annotation): + """Delete an annotation from B2.""" + client, b2 = self.get_client_and_resource() + logger.debug(f'Deleting object on {self.__class__.__name__} Storage {self} for annotation {annotation}') + + # get key that identifies this object in storage + key = B2ExportStorageLink.get_key(annotation) + key = str(self.prefix) + '/' + key if self.prefix else key + + # delete object from storage + b2.Object(self.bucket, key).delete() + + # delete link if everything ok + B2ExportStorageLink.objects.filter(storage=self, annotation=annotation).delete() + + +def async_export_annotation_to_b2_storages(annotation): + """Async function to export annotation to all B2 export storages.""" + project = annotation.project + if hasattr(project, 'io_storages_b2exportstorages'): + for storage in project.io_storages_b2exportstorages.all(): + logger.debug(f'Export {annotation} to B2 storage {storage}') + storage.save_annotation(annotation) + + +@receiver(post_save, sender=Annotation) +def export_annotation_to_b2_storages(sender, instance, **kwargs): + """Signal handler to export annotation to B2 when saved.""" + storages = getattr(instance.project, 'io_storages_b2exportstorages', None) + if storages and storages.exists(): # avoid excess jobs in rq + start_job_async_or_sync(async_export_annotation_to_b2_storages, instance) + + +@receiver(pre_delete, sender=Annotation) +def delete_annotation_from_b2_storages(sender, instance, **kwargs): + """Signal handler to delete annotation from B2 when deleted.""" + links = B2ExportStorageLink.objects.filter(annotation=instance) + for link in links: + storage = link.storage + if storage.can_delete_objects: + logger.debug(f'Delete {instance} from B2 storage {storage}') + storage.delete_annotation(instance) + + +class B2ImportStorageLink(ImportStorageLink): + """Link between a Task and B2 Import Storage.""" + + storage = models.ForeignKey(B2ImportStorage, on_delete=models.CASCADE, related_name='links') + + @classmethod + def exists(cls, key, storage): + """Check if a link already exists for this key and storage.""" + storage_link_exists = super(B2ImportStorageLink, cls).exists(key, storage) + # TODO: this is a workaround to be compatible with old keys version - remove it later + prefix = str(storage.prefix) or '' + return ( + storage_link_exists + or cls.objects.filter(key=prefix + key, storage=storage.id).exists() + or cls.objects.filter(key=prefix + '/' + key, storage=storage.id).exists() + ) + + +class B2ExportStorageLink(ExportStorageLink): + """Link between an Annotation and B2 Export Storage.""" + + storage = models.ForeignKey(B2ExportStorage, on_delete=models.CASCADE, related_name='links') + diff --git a/label_studio/io_storages/b2/openapi_schema.py b/label_studio/io_storages/b2/openapi_schema.py new file mode 100644 index 000000000000..8021da8c977c --- /dev/null +++ b/label_studio/io_storages/b2/openapi_schema.py @@ -0,0 +1,95 @@ +"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. +""" + +# Common B2 storage schema properties following OpenAPI 3.0 specification +_common_b2_storage_schema_properties = { + 'title': {'type': 'string', 'description': 'Storage title', 'maxLength': 2048}, + 'description': {'type': 'string', 'description': 'Storage description'}, + 'project': {'type': 'integer', 'description': 'Project ID'}, + 'bucket': {'type': 'string', 'description': 'B2 bucket name'}, + 'prefix': {'type': 'string', 'description': 'B2 bucket prefix (folder path)'}, + 'b2_access_key_id': { + 'type': 'string', + 'description': 'B2 Application Key ID (equivalent to AWS_ACCESS_KEY_ID)', + }, + 'b2_secret_access_key': { + 'type': 'string', + 'description': 'B2 Application Key (equivalent to AWS_SECRET_ACCESS_KEY)', + }, + 'b2_endpoint_url': { + 'type': 'string', + 'description': 'B2 S3-compatible endpoint URL (e.g., https://s3.us-west-004.backblazeb2.com)', + }, + 'region_name': { + 'type': 'string', + 'description': 'B2 Region (e.g., us-west-004, us-east-005, eu-central-003)', + }, +} + +# B2 import storage schema +_b2_import_storage_schema = { + 'type': 'object', + 'properties': { + 'regex_filter': { + 'type': 'string', + 'description': 'Cloud storage regex for filtering objects. You must specify it otherwise no objects will be imported.', + }, + 'use_blob_urls': { + 'type': 'boolean', + 'description': 'Interpret objects as BLOBs and generate URLs. For example, if your bucket contains images, you can use this option to generate URLs for these images. If set to False, it will read the content of the file and load it into Label Studio.', + 'default': False, + }, + 'presign': { + 'type': 'boolean', + 'description': 'Generate presigned URLs for secure access to private files', + 'default': True, + }, + 'presign_ttl': { + 'type': 'integer', + 'description': 'Presigned URL expiration time in minutes', + 'default': 1, + }, + 'recursive_scan': { + 'type': 'boolean', + 'description': 'Scan recursively through all subfolders', + 'default': False, + }, + **_common_b2_storage_schema_properties, + }, + 'required': [], +} + +# B2 import storage schema with ID +_b2_import_storage_schema_with_id = { + 'type': 'object', + 'properties': { + 'id': {'type': 'integer', 'description': 'Storage ID. If set, storage with specified ID will be updated'}, + **_b2_import_storage_schema['properties'], + }, + 'required': [], +} + +# B2 export storage schema +_b2_export_storage_schema = { + 'type': 'object', + 'properties': { + 'can_delete_objects': { + 'type': 'boolean', + 'description': 'Enable deletion of annotations from B2 when deleted from Label Studio', + 'default': False, + }, + **_common_b2_storage_schema_properties, + }, + 'required': [], +} + +# B2 export storage schema with ID +_b2_export_storage_schema_with_id = { + 'type': 'object', + 'properties': { + 'id': {'type': 'integer', 'description': 'Storage ID. If set, storage with specified ID will be updated'}, + **_b2_export_storage_schema['properties'], + }, + 'required': [], +} + diff --git a/label_studio/io_storages/b2/serializers.py b/label_studio/io_storages/b2/serializers.py new file mode 100644 index 000000000000..4bb16f286a75 --- /dev/null +++ b/label_studio/io_storages/b2/serializers.py @@ -0,0 +1,146 @@ +"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. +""" +import logging +import os + +from botocore.exceptions import ClientError, ParamValidationError +from botocore.handlers import validate_bucket_name +from io_storages.b2.models import B2ExportStorage, B2ImportStorage +from io_storages.serializers import ExportStorageSerializer, ImportStorageSerializer +from rest_framework import serializers +from rest_framework.exceptions import ValidationError + +logger = logging.getLogger(__name__) + + +class B2StorageSerializerMixin: + """ + Mixin for B2 storage serializers. + + Handles secure field filtering and connection validation. + """ + + # These fields contain sensitive data and should not be returned in API responses + secure_fields = ['b2_access_key_id', 'b2_secret_access_key'] + + def to_representation(self, instance): + """ + Remove secure fields from API response. + + This ensures that B2 credentials are never exposed through the API. + """ + result = super().to_representation(instance) + for attr in self.secure_fields: + result.pop(attr, None) + return result + + def validate_bucket(self, value): + """ + Validate B2 bucket name. + + B2 bucket names follow similar rules to AWS S3. + """ + if not value: + return value + try: + validate_bucket_name({'Bucket': value}) + except ParamValidationError as exc: + raise ValidationError(exc.kwargs['report']) from exc + return value + + def validate(self, data): + """ + Validate the entire storage configuration. + + This performs a test connection to B2 to ensure credentials and + configuration are correct before saving. + """ + data = super().validate(data) + if not data.get('bucket', None): + return data + + # Get or create storage instance for validation + storage = self.instance + if storage: + # Update existing storage with new data + for key, value in data.items(): + setattr(storage, key, value) + else: + # Create new storage instance + if 'id' in self.initial_data: + storage_object = self.Meta.model.objects.get(id=self.initial_data['id']) + for attr in self.secure_fields: + data[attr] = data.get(attr) or getattr(storage_object, attr) + storage = self.Meta.model(**data) + + # Validate connection to B2 + try: + storage.validate_connection() + except ParamValidationError: + raise ValidationError( + f'Wrong credentials for B2 bucket {storage.bucket}. ' + 'Please check your B2 Application Key ID and Application Key.' + ) + except ClientError as e: + error_code = e.response.get('Error', {}).get('Code') + http_status = e.response.get('ResponseMetadata', {}).get('HTTPStatusCode') + + # Handle authentication errors + if error_code in ['SignatureDoesNotMatch', '403'] or http_status == 403: + raise ValidationError( + f'Cannot connect to B2 bucket {storage.bucket} with specified credentials. ' + 'Please verify your B2 Application Key ID and Application Key are correct.' + ) + + # Handle bucket not found errors + if error_code in ['NoSuchBucket', '404'] or http_status == 404: + raise ValidationError( + f'Cannot find bucket {storage.bucket} in B2. ' + 'Please verify the bucket name is correct and that you have access to it.' + ) + + # Handle endpoint errors + if 'Could not connect to the endpoint URL' in str(e): + raise ValidationError( + 'Cannot connect to B2 endpoint. ' + 'Please verify your B2 endpoint URL is correct (e.g., https://s3.us-west-004.backblazeb2.com).' + ) + + # Generic error + raise ValidationError(f'Error connecting to B2: {str(e)}') + + except TypeError as e: + logger.info(f'It seems B2 access keys are incorrect: {e}', exc_info=True) + raise ValidationError( + 'It seems B2 access keys are incorrect. ' + 'Please check your B2 Application Key ID and Application Key.' + ) + except KeyError: + raise ValidationError( + f'{storage.url_scheme}://{storage.bucket}/{storage.prefix} not found. ' + 'Please verify the bucket and prefix are correct.' + ) + + return data + + +class B2ImportStorageSerializer(B2StorageSerializerMixin, ImportStorageSerializer): + """Serializer for B2 Import Storage.""" + + type = serializers.ReadOnlyField(default=os.path.basename(os.path.dirname(__file__))) + presign = serializers.BooleanField(required=False, default=True) + + class Meta: + model = B2ImportStorage + fields = '__all__' + + +class B2ExportStorageSerializer(B2StorageSerializerMixin, ExportStorageSerializer): + """Serializer for B2 Export Storage.""" + + type = serializers.ReadOnlyField(default=os.path.basename(os.path.dirname(__file__))) + + class Meta: + model = B2ExportStorage + fields = '__all__' + diff --git a/label_studio/io_storages/b2/utils.py b/label_studio/io_storages/b2/utils.py new file mode 100644 index 000000000000..94ae0a738836 --- /dev/null +++ b/label_studio/io_storages/b2/utils.py @@ -0,0 +1,304 @@ +"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. +""" +import base64 +import fnmatch +import logging +import re +from typing import Optional, Tuple +from urllib.parse import urlparse + +import boto3 +from botocore.client import Config +from botocore.exceptions import ClientError, EndpointConnectionError +from core.utils.params import get_env +from django.conf import settings +from tldextract import TLDExtract + +logger = logging.getLogger(__name__) + +# B2 Connection Configuration +B2_CONNECT_TIMEOUT = int(get_env('B2_CONNECT_TIMEOUT', 60)) # Connection timeout in seconds +B2_READ_TIMEOUT = int(get_env('B2_READ_TIMEOUT', 60)) # Read timeout in seconds +B2_MAX_RETRIES = int(get_env('B2_MAX_RETRIES', 3)) # Maximum number of retry attempts + + +def get_client_and_resource( + b2_access_key_id: Optional[str] = None, + b2_secret_access_key: Optional[str] = None, + b2_endpoint_url: Optional[str] = None, + region_name: Optional[str] = None, +) -> Tuple: + """ + Create boto3 client and resource for Backblaze B2 Cloud Storage with production-ready configuration. + + B2 is S3-compatible, so we use boto3 with a custom endpoint URL. + Includes timeout, retry, and connection pool configuration for reliability. + + Args: + b2_access_key_id: B2 Application Key ID (equivalent to AWS access key) + b2_secret_access_key: B2 Application Key (equivalent to AWS secret key) + b2_endpoint_url: B2 endpoint URL (e.g., https://s3.us-west-004.backblazeb2.com) + region_name: B2 region name (e.g., us-west-004) + + Returns: + Tuple[boto3.client, boto3.resource]: Tuple of (boto3 S3 client, boto3 S3 resource) + + Raises: + ValueError: If credentials or endpoint URL are missing + EndpointConnectionError: If unable to connect to B2 endpoint + """ + # Read from environment variables if not provided + b2_access_key_id = b2_access_key_id or get_env('B2_ACCESS_KEY_ID') + b2_secret_access_key = b2_secret_access_key or get_env('B2_SECRET_ACCESS_KEY') + b2_endpoint_url = b2_endpoint_url or get_env('B2_ENDPOINT_URL') + region_name = region_name or get_env('B2_REGION') or 'us-west-004' + + # Validate required credentials + if not b2_access_key_id or not b2_secret_access_key: + raise ValueError( + 'B2 credentials are required. Please provide B2_ACCESS_KEY_ID and B2_SECRET_ACCESS_KEY ' + 'either as parameters or environment variables.' + ) + + logger.info( + f'Initializing Backblaze B2 connection: ' + f'endpoint={b2_endpoint_url}, ' + f'region={region_name}, ' + f'key_id={b2_access_key_id[:10]}***' + ) + + # Create boto3 session with B2 credentials + try: + session = boto3.Session( + aws_access_key_id=b2_access_key_id, + aws_secret_access_key=b2_secret_access_key, + ) + except Exception as e: + logger.error(f'Failed to create boto3 session: {e}', exc_info=True) + raise ValueError(f'Invalid B2 credentials: {e}') from e + + # B2 requires explicit endpoint URL + if not b2_endpoint_url: + # Default endpoint pattern for B2 + b2_endpoint_url = f'https://s3.{region_name}.backblazeb2.com' + logger.warning( + f'No B2 endpoint URL provided, using default: {b2_endpoint_url}. ' + 'For production, set B2_ENDPOINT_URL environment variable.' + ) + + # Configure boto3 with timeout, retry, and connection pooling + boto_config = Config( + signature_version='s3v4', + connect_timeout=B2_CONNECT_TIMEOUT, + read_timeout=B2_READ_TIMEOUT, + retries={ + 'max_attempts': B2_MAX_RETRIES, + 'mode': 'adaptive', # Adaptive retry mode for better resilience + }, + max_pool_connections=50, # Connection pooling for performance + ) + + settings_dict = { + 'region_name': region_name, + 'endpoint_url': b2_endpoint_url, + } + + try: + # Create S3-compatible client and resource for B2 + client = session.client('s3', config=boto_config, **settings_dict) + resource = session.resource('s3', config=boto_config, **settings_dict) + + logger.info( + f'B2 client created successfully with timeout={B2_CONNECT_TIMEOUT}s, ' + f'max_retries={B2_MAX_RETRIES}' + ) + + return client, resource + + except EndpointConnectionError as e: + logger.error( + f'Failed to connect to B2 endpoint {b2_endpoint_url}: {e}. ' + 'Please verify the endpoint URL is correct and accessible.', + exc_info=True + ) + raise + except Exception as e: + logger.error(f'Unexpected error creating B2 client: {e}', exc_info=True) + raise + + +def resolve_b2_url(url: str, client, presign: bool = True, expires_in: int = 3600) -> str: + """ + Resolve B2 URL to either presigned URL or base64 encoded data. + + This function handles conversion of b2:// URLs to accessible HTTP(S) URLs or inline data. + + Args: + url: The b2:// URL to resolve (e.g., "b2://my-bucket/path/to/file.jpg") + client: boto3 S3 client for B2 + presign: If True, generate presigned URL; if False, return base64 data + expires_in: Presigned URL expiration time in seconds (default: 3600 = 1 hour) + + Returns: + str: Either a presigned HTTPS URL or base64-encoded data URL + + Raises: + ClientError: If unable to access the object in B2 + """ + try: + r = urlparse(url, allow_fragments=False) + bucket_name = r.netloc + key = r.path.lstrip('/') + + logger.debug(f'Resolving B2 URL: bucket={bucket_name}, key={key}, presign={presign}') + + # Return blob as base64 encoded string if presigned urls are disabled + if not presign: + logger.info(f'Fetching object from B2 for base64 encoding: {bucket_name}/{key}') + obj = client.get_object(Bucket=bucket_name, Key=key) + content_type = obj['ResponseMetadata']['HTTPHeaders'].get('content-type', 'application/octet-stream') + object_data = obj['Body'].read() + object_b64 = 'data:' + content_type + ';base64,' + base64.b64encode(object_data).decode('utf-8') + logger.debug(f'Generated base64 data URL for {key} ({len(object_data)} bytes)') + return object_b64 + + # Otherwise try to generate presigned url + try: + presigned_url = client.generate_presigned_url( + ClientMethod='get_object', + Params={'Bucket': bucket_name, 'Key': key}, + ExpiresIn=expires_in + ) + logger.info(f'Generated presigned URL for {bucket_name}/{key} (expires in {expires_in}s)') + return presigned_url + except ClientError as exc: + logger.warning( + f"Failed to generate presigned URL for B2 object {bucket_name}/{key}: {exc}. " + "Returning original URL as fallback." + ) + return url + + except Exception as e: + logger.error(f'Error resolving B2 URL {url}: {e}', exc_info=True) + return url # Fallback to original URL + + +class B2(object): + """Helper class for Backblaze B2 Cloud Storage operations.""" + + @classmethod + def get_blob_metadata( + cls, + url: str, + bucket_name: str, + client=None, + b2_access_key_id=None, + b2_secret_access_key=None, + b2_endpoint_url=None, + region_name=None, + ): + """ + Get blob metadata from B2 by URL. + + Args: + url: Object key + bucket_name: B2 bucket name + client: B2 client for batch processing (optional) + b2_access_key_id: B2 Application Key ID + b2_secret_access_key: B2 Application Key + b2_endpoint_url: B2 endpoint URL + region_name: B2 region name + + Returns: + Object metadata dict + """ + if client is None: + client, _ = get_client_and_resource( + b2_access_key_id=b2_access_key_id, + b2_secret_access_key=b2_secret_access_key, + b2_endpoint_url=b2_endpoint_url, + region_name=region_name, + ) + obj = client.get_object(Bucket=bucket_name, Key=url) + metadata = dict(obj) + # remove unused fields + metadata.pop('Body', None) + metadata.pop('ResponseMetadata', None) + return metadata + + @classmethod + def validate_pattern(cls, storage, pattern, glob_pattern=True): + """ + Validate pattern against B2 Storage. + + Args: + storage: B2 Storage instance + pattern: Pattern to validate + glob_pattern: If True, pattern is a glob pattern, otherwise it is a regex pattern + + Returns: + Message if pattern is not valid, empty string otherwise + """ + client, bucket = storage.get_client_and_bucket() + if glob_pattern: + pattern = fnmatch.translate(pattern) + regex = re.compile(pattern) + + if storage.prefix: + list_kwargs = {'Prefix': storage.prefix.rstrip('/') + '/'} + if not storage.recursive_scan: + list_kwargs['Delimiter'] = '/' + bucket_iter = bucket.objects.filter(**list_kwargs) + else: + bucket_iter = bucket.objects + + bucket_iter = bucket_iter.page_size(settings.CLOUD_STORAGE_CHECK_FOR_RECORDS_PAGE_SIZE).all() + + for index, obj in enumerate(bucket_iter): + key = obj.key + # skip directories + if key.endswith('/'): + logger.debug(key + ' is skipped because it is a folder') + continue + if regex and regex.match(key): + logger.debug(key + ' matches file pattern') + return '' + return 'No objects found matching the provided glob pattern' + + +class B2StorageError(Exception): + """Exception raised for B2 storage-specific errors.""" + pass + + +# see https://github.com/john-kurkowski/tldextract?tab=readme-ov-file#note-about-caching +# prevents network call on first use +extractor = TLDExtract(suffix_list_urls=()) + + +def catch_and_reraise_from_none(func): + """ + For B2 storages - if b2_endpoint_url is not on a known domain, catch exception and + raise a new one with the previous context suppressed. See also: https://peps.python.org/pep-0409/ + + This decorator is specifically designed for B2 Cloud Storage to handle errors gracefully + when using custom endpoint URLs. + """ + + def wrapper(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except Exception as e: + if self.b2_endpoint_url and ( + domain := extractor.extract_urllib(urlparse(self.b2_endpoint_url)).registered_domain.lower() + ) not in [trusted_domain.lower() for trusted_domain in settings.B2_TRUSTED_STORAGE_DOMAINS]: + logger.error(f'Exception from unrecognized B2 domain: {e}', exc_info=True) + raise B2StorageError( + f'Debugging info is not available for B2 endpoints on domain: {domain}. ' + 'Please contact your Label Studio devops team if you require detailed error reporting for this domain.' + ) from None + else: + raise e + + return wrapper + diff --git a/label_studio/io_storages/base_models.py b/label_studio/io_storages/base_models.py index 135bfc1ae982..10bc29074f88 100644 --- a/label_studio/io_storages/base_models.py +++ b/label_studio/io_storages/base_models.py @@ -571,7 +571,7 @@ def _scan_and_create_links(self, link_class): raise UnsupportedFileFormatError( f'File "{key}" is not a JSON/JSONL/Parquet file. Only .json, .jsonl, and .parquet files can be processed.\n' f"If you're trying to import non-JSON data (images, audio, text, etc.), " - f'edit storage settings and enable "Tasks" import method' + f'edit storage settings and enable "Files" import method (use_blob_urls=True)' ) try: @@ -581,7 +581,7 @@ def _scan_and_create_links(self, link_class): raise ValueError( f'Error loading JSON from file "{key}".\nIf you\'re trying to import non-JSON data ' f'(images, audio, text, etc.), edit storage settings and enable ' - f'"Tasks" import method' + f'"Files" import method (use_blob_urls=True)' ) for link_object in link_objects: diff --git a/label_studio/io_storages/functions.py b/label_studio/io_storages/functions.py index e2a11a3a6601..5d29f8a0d7a6 100644 --- a/label_studio/io_storages/functions.py +++ b/label_studio/io_storages/functions.py @@ -6,6 +6,7 @@ from rest_framework.exceptions import PermissionDenied, ValidationError from .azure_blob.api import AzureBlobExportStorageListAPI, AzureBlobImportStorageListAPI +from .b2.api import B2ExportStorageListAPI, B2ImportStorageListAPI from .gcs.api import GCSExportStorageListAPI, GCSImportStorageListAPI from .redis.api import RedisExportStorageListAPI, RedisImportStorageListAPI from .s3.api import S3ExportStorageListAPI, S3ImportStorageListAPI @@ -72,6 +73,12 @@ def get_storage_list(): 'import_list_api': S3ImportStorageListAPI, 'export_list_api': S3ExportStorageListAPI, }, + { + 'name': 'b2', + 'title': 'Backblaze B2', + 'import_list_api': B2ImportStorageListAPI, + 'export_list_api': B2ExportStorageListAPI, + }, { 'name': 'gcs', 'title': 'Google Cloud Storage', diff --git a/label_studio/io_storages/migrations/0022_add_b2_storage_models.py b/label_studio/io_storages/migrations/0022_add_b2_storage_models.py new file mode 100644 index 000000000000..698a4e0e8861 --- /dev/null +++ b/label_studio/io_storages/migrations/0022_add_b2_storage_models.py @@ -0,0 +1,125 @@ +# Generated manually for Backblaze B2 Cloud Storage integration + +import django.db.models.deletion +import django.utils.timezone +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('projects', '0031_alter_project_show_ground_truth_first'), + ('tasks', '0057_annotation_proj_result_octlen_idx_async'), + ('io_storages', '0021_azureblobimportstorage_recursive_scan_and_more'), + ] + + operations = [ + # Create B2 Import Storage + migrations.CreateModel( + name='B2ImportStorage', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + # StorageInfo fields + ('last_sync', models.DateTimeField(blank=True, help_text='Last sync finished time', null=True, verbose_name='last sync')), + ('last_sync_count', models.PositiveIntegerField(blank=True, help_text='Count of tasks synced last time', null=True, verbose_name='last sync count')), + ('last_sync_job', models.CharField(blank=True, help_text='Last sync job ID', max_length=256, null=True, verbose_name='last_sync_job')), + ('status', models.CharField(choices=[('initialized', 'Initialized'), ('queued', 'Queued'), ('in_progress', 'In progress'), ('failed', 'Failed'), ('completed', 'Completed'), ('completed_with_errors', 'Completed with errors')], default='initialized', max_length=64)), + ('traceback', models.TextField(blank=True, help_text='Traceback report for the last failed sync', null=True)), + ('meta', models.JSONField(default=dict, help_text='Meta and debug information about storage processes', null=True, verbose_name='meta')), + # Storage fields + ('title', models.CharField(blank=True, help_text='Cloud storage title', max_length=256, null=True, verbose_name='title')), + ('description', models.TextField(blank=True, help_text='Cloud storage description', null=True, verbose_name='description')), + ('created_at', models.DateTimeField(auto_now_add=True, help_text='Creation time', verbose_name='created at')), + ('synchronizable', models.BooleanField(default=True, help_text='If storage can be synced', verbose_name='synchronizable')), + # B2StorageMixin fields + ('bucket', models.TextField(blank=True, help_text='B2 bucket name', null=True, verbose_name='bucket')), + ('prefix', models.TextField(blank=True, help_text='B2 bucket prefix (folder path)', null=True, verbose_name='prefix')), + ('regex_filter', models.TextField(blank=True, help_text='Cloud storage regex for filtering objects', null=True, verbose_name='regex_filter')), + ('use_blob_urls', models.BooleanField(default=False, help_text='Interpret objects as BLOBs and generate URLs', verbose_name='use_blob_urls')), + ('b2_access_key_id', models.TextField(blank=True, help_text='B2 Application Key ID (equivalent to AWS_ACCESS_KEY_ID)', null=True, verbose_name='b2_access_key_id')), + ('b2_secret_access_key', models.TextField(blank=True, help_text='B2 Application Key (equivalent to AWS_SECRET_ACCESS_KEY)', null=True, verbose_name='b2_secret_access_key')), + ('b2_endpoint_url', models.TextField(blank=True, help_text='B2 S3-compatible endpoint URL (e.g., https://s3.us-west-004.backblazeb2.com)', null=True, verbose_name='b2_endpoint_url')), + ('region_name', models.TextField(blank=True, help_text='B2 Region (e.g., us-west-004, us-east-005, eu-central-003)', null=True, verbose_name='region_name')), + # B2ImportStorageBase fields + ('presign', models.BooleanField(default=True, help_text='Generate presigned URLs', verbose_name='presign')), + ('presign_ttl', models.PositiveSmallIntegerField(default=1, help_text='Presigned URLs TTL (in minutes)', verbose_name='presign_ttl')), + ('recursive_scan', models.BooleanField(default=False, help_text='Perform recursive scan over the bucket content', verbose_name='recursive scan')), + # ProjectStorageMixin fields + ('project', models.ForeignKey(help_text='A unique integer value identifying this project.', on_delete=django.db.models.deletion.CASCADE, related_name='io_storages_b2importstorages', to='projects.project')), + ], + options={ + 'abstract': False, + }, + ), + + # Create B2 Export Storage + migrations.CreateModel( + name='B2ExportStorage', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + # StorageInfo fields + ('last_sync', models.DateTimeField(blank=True, help_text='Last sync finished time', null=True, verbose_name='last sync')), + ('last_sync_count', models.PositiveIntegerField(blank=True, help_text='Count of tasks synced last time', null=True, verbose_name='last sync count')), + ('last_sync_job', models.CharField(blank=True, help_text='Last sync job ID', max_length=256, null=True, verbose_name='last_sync_job')), + ('status', models.CharField(choices=[('initialized', 'Initialized'), ('queued', 'Queued'), ('in_progress', 'In progress'), ('failed', 'Failed'), ('completed', 'Completed'), ('completed_with_errors', 'Completed with errors')], default='initialized', max_length=64)), + ('traceback', models.TextField(blank=True, help_text='Traceback report for the last failed sync', null=True)), + ('meta', models.JSONField(default=dict, help_text='Meta and debug information about storage processes', null=True, verbose_name='meta')), + # Storage fields + ('title', models.CharField(blank=True, help_text='Cloud storage title', max_length=256, null=True, verbose_name='title')), + ('description', models.TextField(blank=True, help_text='Cloud storage description', null=True, verbose_name='description')), + ('created_at', models.DateTimeField(auto_now_add=True, help_text='Creation time', verbose_name='created at')), + ('synchronizable', models.BooleanField(default=True, help_text='If storage can be synced', verbose_name='synchronizable')), + # ExportStorage fields + ('can_delete_objects', models.BooleanField(blank=True, help_text='Deletion from storage enabled', null=True, verbose_name='can_delete_objects')), + # B2StorageMixin fields + ('bucket', models.TextField(blank=True, help_text='B2 bucket name', null=True, verbose_name='bucket')), + ('prefix', models.TextField(blank=True, help_text='B2 bucket prefix (folder path)', null=True, verbose_name='prefix')), + ('regex_filter', models.TextField(blank=True, help_text='Cloud storage regex for filtering objects', null=True, verbose_name='regex_filter')), + ('use_blob_urls', models.BooleanField(default=False, help_text='Interpret objects as BLOBs and generate URLs', verbose_name='use_blob_urls')), + ('b2_access_key_id', models.TextField(blank=True, help_text='B2 Application Key ID (equivalent to AWS_ACCESS_KEY_ID)', null=True, verbose_name='b2_access_key_id')), + ('b2_secret_access_key', models.TextField(blank=True, help_text='B2 Application Key (equivalent to AWS_SECRET_ACCESS_KEY)', null=True, verbose_name='b2_secret_access_key')), + ('b2_endpoint_url', models.TextField(blank=True, help_text='B2 S3-compatible endpoint URL (e.g., https://s3.us-west-004.backblazeb2.com)', null=True, verbose_name='b2_endpoint_url')), + ('region_name', models.TextField(blank=True, help_text='B2 Region (e.g., us-west-004, us-east-005, eu-central-003)', null=True, verbose_name='region_name')), + # ProjectStorageMixin fields + ('project', models.ForeignKey(help_text='A unique integer value identifying this project.', on_delete=django.db.models.deletion.CASCADE, related_name='io_storages_b2exportstorages', to='projects.project')), + ], + options={ + 'abstract': False, + }, + ), + + # Create B2 Import Storage Link + migrations.CreateModel( + name='B2ImportStorageLink', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('key', models.TextField(help_text='External link key', verbose_name='key')), + ('object_exists', models.BooleanField(default=True, help_text='Whether object under external link still exists', verbose_name='object exists')), + ('created_at', models.DateTimeField(auto_now_add=True, help_text='Creation time', verbose_name='created at')), + ('row_group', models.IntegerField(blank=True, help_text='Parquet row group', null=True)), + ('row_index', models.IntegerField(blank=True, help_text='Parquet row index, or JSON[L] object index', null=True)), + ('task', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='io_storages_b2importstoragelink', to='tasks.task')), + ('storage', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='links', to='io_storages.b2importstorage')), + ], + options={ + 'abstract': False, + }, + ), + + # Create B2 Export Storage Link + migrations.CreateModel( + name='B2ExportStorageLink', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('object_exists', models.BooleanField(default=True, help_text='Whether object under external link still exists', verbose_name='object exists')), + ('created_at', models.DateTimeField(auto_now_add=True, help_text='Creation time', verbose_name='created at')), + ('updated_at', models.DateTimeField(auto_now=True, help_text='Update time', verbose_name='updated at')), + ('annotation', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='io_storages_b2exportstoragelink', to='tasks.annotation')), + ('storage', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='links', to='io_storages.b2exportstorage')), + ], + options={ + 'abstract': False, + }, + ), + ] + diff --git a/label_studio/io_storages/migrations/0023_alter_b2exportstorage_project_and_more.py b/label_studio/io_storages/migrations/0023_alter_b2exportstorage_project_and_more.py new file mode 100644 index 000000000000..b173fa408b87 --- /dev/null +++ b/label_studio/io_storages/migrations/0023_alter_b2exportstorage_project_and_more.py @@ -0,0 +1,54 @@ +# Generated by Django 5.1.13 on 2025-10-21 16:42 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("io_storages", "0022_add_b2_storage_models"), + ("projects", "0031_alter_project_show_ground_truth_first"), + ("tasks", "0058_task_precomputed_agreement"), + ] + + operations = [ + migrations.AlterField( + model_name="b2exportstorage", + name="project", + field=models.ForeignKey( + help_text="A unique integer value identifying this project.", + on_delete=django.db.models.deletion.CASCADE, + related_name="%(app_label)s_%(class)ss", + to="projects.project", + ), + ), + migrations.AlterField( + model_name="b2exportstoragelink", + name="annotation", + field=models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="%(app_label)s_%(class)s", + to="tasks.annotation", + ), + ), + migrations.AlterField( + model_name="b2importstorage", + name="project", + field=models.ForeignKey( + help_text="A unique integer value identifying this project.", + on_delete=django.db.models.deletion.CASCADE, + related_name="%(app_label)s_%(class)ss", + to="projects.project", + ), + ), + migrations.AlterField( + model_name="b2importstoragelink", + name="task", + field=models.OneToOneField( + on_delete=django.db.models.deletion.CASCADE, + related_name="%(app_label)s_%(class)s", + to="tasks.task", + ), + ), + ] diff --git a/label_studio/io_storages/models.py b/label_studio/io_storages/models.py index 98264925f001..1e8ed5ee8e55 100644 --- a/label_studio/io_storages/models.py +++ b/label_studio/io_storages/models.py @@ -8,6 +8,12 @@ AzureBlobExportStorage, AzureBlobExportStorageLink, ) +from .b2.models import ( # noqa: F401 + B2ImportStorage, + B2ImportStorageLink, + B2ExportStorage, + B2ExportStorageLink, +) from .s3.models import ( # noqa: F401 S3ImportStorage, S3ImportStorageLink, diff --git a/label_studio/io_storages/urls.py b/label_studio/io_storages/urls.py index 42e686eb8ff1..41a01ad9e486 100644 --- a/label_studio/io_storages/urls.py +++ b/label_studio/io_storages/urls.py @@ -23,6 +23,19 @@ AzureBlobImportStorageSyncAPI, AzureBlobImportStorageValidateAPI, ) +from io_storages.b2.api import ( + B2ExportStorageDetailAPI, + B2ExportStorageFormLayoutAPI, + B2ExportStorageListAPI, + B2ExportStorageSyncAPI, + B2ExportStorageValidateAPI, + B2ImportStorageDetailAPI, + B2ImportStorageFormLayoutAPI, + B2ImportStorageListAPI, + B2ImportStorageSerializer, + B2ImportStorageSyncAPI, + B2ImportStorageValidateAPI, +) from io_storages.gcs.api import ( GCSExportStorageDetailAPI, GCSExportStorageFormLayoutAPI, @@ -101,6 +114,22 @@ path('export/s3//sync', S3ExportStorageSyncAPI.as_view(), name='export-storage-s3-sync'), path('export/s3/validate', S3ExportStorageValidateAPI.as_view(), name='export-storage-s3-validate'), path('export/s3/form', S3ExportStorageFormLayoutAPI.as_view(), name='export-storage-s3-form'), + # Backblaze B2 + path('b2/', B2ImportStorageListAPI.as_view(), name='storage-b2-list'), + path('b2/', B2ImportStorageDetailAPI.as_view(), name='storage-b2-detail'), + path('b2//sync', B2ImportStorageSyncAPI.as_view(), name='storage-b2-sync'), + path('b2/validate', B2ImportStorageValidateAPI.as_view(), name='storage-b2-validate'), + path('b2/form', B2ImportStorageFormLayoutAPI.as_view(), name='storage-b2-form'), + path( + 'b2/files', + ImportStorageListFilesAPI().as_view(serializer_class=B2ImportStorageSerializer), + name='storage-b2-list-files', + ), + path('export/b2', B2ExportStorageListAPI.as_view(), name='export-storage-b2-list'), + path('export/b2/', B2ExportStorageDetailAPI.as_view(), name='export-storage-b2-detail'), + path('export/b2//sync', B2ExportStorageSyncAPI.as_view(), name='export-storage-b2-sync'), + path('export/b2/validate', B2ExportStorageValidateAPI.as_view(), name='export-storage-b2-validate'), + path('export/b2/form', B2ExportStorageFormLayoutAPI.as_view(), name='export-storage-b2-form'), # Microsoft Azure path('azure/', AzureBlobImportStorageListAPI.as_view(), name='storage-azure-list'), path('azure/', AzureBlobImportStorageDetailAPI.as_view(), name='storage-azure-detail'), diff --git a/label_studio/tests/io_storages/b2/__init__.py b/label_studio/tests/io_storages/b2/__init__.py new file mode 100644 index 000000000000..570df5884401 --- /dev/null +++ b/label_studio/tests/io_storages/b2/__init__.py @@ -0,0 +1 @@ +# Backblaze B2 Storage Tests diff --git a/label_studio/tests/io_storages/b2/test_models.py b/label_studio/tests/io_storages/b2/test_models.py new file mode 100644 index 000000000000..09729f6dde2a --- /dev/null +++ b/label_studio/tests/io_storages/b2/test_models.py @@ -0,0 +1,157 @@ +import json +from unittest.mock import MagicMock, patch + +import pytest +from io_storages.b2.models import B2ExportStorage, B2ImportStorage +from tasks.models import Annotation +from tests.utils import make_project, make_task + + +@pytest.mark.django_db +def test_b2_import_storage_creation(business_client): + """Test creating B2 import storage with valid credentials""" + project = make_project({}, business_client.user, use_ml_backend=False) + + data = { + 'project': project.id, + 'title': 'Test B2 Import', + 'bucket': 'test-bucket', + 'prefix': 'test-prefix/', + 'regex_filter': '', + 'use_blob_urls': True, + 'presign': True, + 'presign_ttl': 15, + 'b2_access_key_id': 'test_key_id', + 'b2_secret_access_key': 'test_secret', + 'b2_endpoint_url': 'https://s3.us-west-004.backblazeb2.com', + 'region_name': 'us-west-004', + } + + with patch('io_storages.b2.models.B2ImportStorage.validate_connection'): + r = business_client.post( + f'/api/storages/b2?project={project.id}', data=json.dumps(data), content_type='application/json' + ) + assert r.status_code == 201 + assert r.json()['bucket'] == 'test-bucket' + assert r.json()['b2_endpoint_url'] == 'https://s3.us-west-004.backblazeb2.com' + + +@pytest.mark.django_db +def test_b2_export_storage_creation(business_client): + """Test creating B2 export storage with valid credentials""" + project = make_project({}, business_client.user, use_ml_backend=False) + + data = { + 'project': project.id, + 'title': 'Test B2 Export', + 'bucket': 'test-bucket', + 'prefix': 'exports/', + 'b2_access_key_id': 'test_key_id', + 'b2_secret_access_key': 'test_secret', + 'b2_endpoint_url': 'https://s3.us-west-004.backblazeb2.com', + 'region_name': 'us-west-004', + 'can_delete_objects': False, + } + + with patch('io_storages.b2.models.B2ExportStorage.validate_connection'): + r = business_client.post( + f'/api/storages/export/b2?project={project.id}', data=json.dumps(data), content_type='application/json' + ) + assert r.status_code == 201 + assert r.json()['bucket'] == 'test-bucket' + assert r.json()['prefix'] == 'exports/' + + +@pytest.mark.django_db +def test_b2_storage_missing_credentials(business_client): + """Test that B2 storage creation fails without credentials""" + project = make_project({}, business_client.user, use_ml_backend=False) + + data = { + 'project': project.id, + 'title': 'Test B2', + 'bucket': 'test-bucket', + 'b2_endpoint_url': 'https://s3.us-west-004.backblazeb2.com', + # Missing b2_access_key_id and b2_secret_access_key + } + + r = business_client.post( + f'/api/storages/b2?project={project.id}', data=json.dumps(data), content_type='application/json' + ) + assert r.status_code == 400 + + +@pytest.mark.django_db +def test_b2_storage_invalid_endpoint(business_client): + """Test that B2 storage creation fails with invalid endpoint""" + project = make_project({}, business_client.user, use_ml_backend=False) + + data = { + 'project': project.id, + 'title': 'Test B2', + 'bucket': 'test-bucket', + 'b2_access_key_id': 'test_key_id', + 'b2_secret_access_key': 'test_secret', + 'b2_endpoint_url': 'invalid-url', # Invalid URL + } + + r = business_client.post( + f'/api/storages/b2?project={project.id}', data=json.dumps(data), content_type='application/json' + ) + assert r.status_code == 400 + + +@pytest.mark.django_db +def test_b2_export_annotation_signal(): + """Test that annotations are exported to B2 storage on save""" + from io_storages.b2.models import B2ExportStorageLink + + # Create project and export storage + project = make_project({}, None, use_ml_backend=False) + export_storage = B2ExportStorage.objects.create( + project=project, + title='Test Export', + bucket='test-bucket', + b2_access_key_id='test_key', + b2_secret_access_key='test_secret', + b2_endpoint_url='https://s3.us-west-004.backblazeb2.com', + ) + + # Create task + task = make_task({'data': {}}, project) + + # Mock the save_annotation method + with patch.object(B2ExportStorage, 'save_annotation') as mock_save: + # Create annotation + annotation = Annotation.objects.create(task=task, project=project, result=[]) + + # Verify save_annotation was called + mock_save.assert_called_once() + + # Verify export link was created + link = B2ExportStorageLink.objects.filter(annotation=annotation, storage=export_storage).first() + assert link is not None + + +@pytest.mark.django_db +def test_b2_import_storage_get_data(): + """Test B2 import storage get_data method""" + project = make_project({}, None, use_ml_backend=False) + import_storage = B2ImportStorage.objects.create( + project=project, + title='Test Import', + bucket='test-bucket', + b2_access_key_id='test_key', + b2_secret_access_key='test_secret', + b2_endpoint_url='https://s3.us-west-004.backblazeb2.com', + ) + + # Mock boto3 client + mock_client = MagicMock() + mock_client.get_object.return_value = {'Body': MagicMock(read=lambda: b'{"test": "data"}')} + + with patch.object(import_storage, 'get_client_and_bucket', return_value=(mock_client, None)): + data = import_storage.get_data('test-key') + assert data == b'{"test": "data"}' + mock_client.get_object.assert_called_once() + diff --git a/label_studio/tests/io_storages/b2/test_utils.py b/label_studio/tests/io_storages/b2/test_utils.py new file mode 100644 index 000000000000..d06b94f8219b --- /dev/null +++ b/label_studio/tests/io_storages/b2/test_utils.py @@ -0,0 +1,44 @@ +from unittest.mock import patch + +import pytest +from django.test import override_settings +from io_storages.b2.utils import B2StorageError, catch_and_reraise_from_none + + +@override_settings(B2_TRUSTED_STORAGE_DOMAINS=['backblazeb2.com', 'backblaze.com']) +def test_catch_and_reraise_from_none_with_untrusted_domain(): + class TestClass: + b2_endpoint_url = 'http://untrusted-domain.com' + + instance = TestClass() + + @catch_and_reraise_from_none + def function_to_test(self): + raise Exception('Original Exception') + + with patch('io_storages.b2.utils.extractor.extract_urllib') as mock_extract: + mock_extract.return_value.registered_domain = 'untrusted-domain.com' + with pytest.raises(B2StorageError) as excinfo: + function_to_test(instance) + assert 'Debugging info is not available for b2 endpoints on domain: untrusted-domain.com' in str( + excinfo.value + ) + + +@override_settings(B2_TRUSTED_STORAGE_DOMAINS=['backblazeb2.com', 'backblaze.com']) +def test_catch_and_reraise_from_none_with_trusted_domain(): + class TestClass: + b2_endpoint_url = 'https://s3.us-west-004.backblazeb2.com' + + instance = TestClass() + + @catch_and_reraise_from_none + def function_to_test(self): + raise Exception('Original Exception') + + with patch('io_storages.b2.utils.extractor.extract_urllib') as mock_extract: + mock_extract.return_value.registered_domain = 'backblazeb2.com' + with pytest.raises(Exception) as excinfo: + function_to_test(instance) + assert 'Original Exception' in str(excinfo.value) + diff --git a/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/b2.tsx b/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/b2.tsx new file mode 100644 index 000000000000..c123761a2f50 --- /dev/null +++ b/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/b2.tsx @@ -0,0 +1,129 @@ +import { z } from "zod"; +import type { ProviderConfig } from "@humansignal/app-common/blocks/StorageProviderForm/types/provider"; +import { IconCloudProviderBackblaze } from "@humansignal/icons"; + +/** + * Backblaze B2 Cloud Storage Provider Configuration + * + * B2 is S3-compatible, using boto3 with custom endpoint URLs. + * Users provide their B2 Application Key credentials and bucket details. + */ +export const b2Provider: ProviderConfig = { + name: "b2", + title: "Backblaze B2", + description: "Configure your Backblaze B2 Cloud Storage connection with S3-compatible settings", + icon: IconCloudProviderBackblaze, + fields: [ + { + name: "bucket", + type: "text", + label: "Bucket Name", + required: true, + placeholder: "my-b2-bucket", + schema: z.string().min(1, "Bucket name is required"), + description: "Your Backblaze B2 bucket name", + }, + { + name: "b2_endpoint_url", + type: "text", + label: "B2 Endpoint URL", + required: true, + placeholder: "https://s3.us-west-004.backblazeb2.com", + schema: z.string() + .min(1, "B2 Endpoint URL is required") + .url("Must be a valid URL") + .refine( + (url) => url.includes("backblazeb2.com") || url.includes("backblaze.com"), + "Endpoint URL must be a Backblaze B2 endpoint" + ), + description: "Your region-specific B2 S3-compatible endpoint (e.g., https://s3.us-west-004.backblazeb2.com)", + }, + { + name: "region_name", + type: "text", + label: "Region Name", + placeholder: "us-west-004", + schema: z.string().optional().default("us-west-004"), + description: "B2 region (e.g., us-west-004, us-east-005, eu-central-003)", + }, + { + name: "prefix", + type: "text", + label: "Bucket Prefix (Folder Path)", + placeholder: "path/to/files", + schema: z.string().optional().default(""), + target: "export", + description: "Optional folder path within the bucket", + }, + { + name: "b2_access_key_id", + type: "password", + label: "Application Key ID", + required: true, + placeholder: "0051234567890abcdef", + autoComplete: "off", + accessKey: true, + schema: z.string().min(1, "B2 Application Key ID is required"), + description: "Your B2 Application Key ID (from Backblaze dashboard > App Keys)", + }, + { + name: "b2_secret_access_key", + type: "password", + label: "Application Key", + required: true, + placeholder: "K001234567890abcdefghij", + autoComplete: "new-password", + accessKey: true, + schema: z.string().min(1, "B2 Application Key is required"), + description: "Your B2 Application Key (shown only once when created)", + }, + { + name: "presign", + type: "toggle", + label: "Use pre-signed URLs (On) / Proxy through the platform (Off)", + description: + "When pre-signed URLs are enabled, all data bypasses the platform and user browsers directly read data from B2 storage", + schema: z.boolean().default(true), + target: "import", + resetConnection: false, + }, + { + name: "presign_ttl", + type: "counter", + label: "Expire pre-signed URLs (minutes)", + min: 1, + max: 10080, // 7 days + step: 1, + schema: z.number().min(1).max(10080).default(15), + target: "import", + resetConnection: false, + dependsOn: { + field: "presign", + value: true, + }, + description: "Time until pre-signed URLs expire (default: 15 minutes)", + }, + { + name: "recursive_scan", + type: "toggle", + label: "Scan all sub-folders", + description: "When enabled, files from all nested folders will be imported", + schema: z.boolean().default(false), + target: "import", + resetConnection: false, + }, + ], + layout: [ + { fields: ["bucket"] }, + { fields: ["b2_endpoint_url"] }, + { fields: ["region_name"] }, + { fields: ["prefix"] }, + { fields: ["b2_access_key_id"] }, + { fields: ["b2_secret_access_key"] }, + { fields: ["presign", "presign_ttl"] }, + { fields: ["recursive_scan"] }, + ], +}; + +export default b2Provider; + diff --git a/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/index.ts b/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/index.ts index 06d27f9462b6..0508f7f8fe7a 100644 --- a/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/index.ts +++ b/web/apps/labelstudio/src/pages/Settings/StorageSettings/providers/index.ts @@ -1,5 +1,6 @@ import azureProvider from "./azure"; import azureSpiProvider from "./azure_spi"; +import b2Provider from "./b2"; import databricksProvider from "./databricks"; import gcsProvider from "./gcs"; import gcsWifProvider from "./gcswif"; @@ -11,6 +12,7 @@ import s3sProvider from "./s3s"; export const providers = { // Standard providers s3: s3Provider, + b2: b2Provider, gcs: gcsProvider, azure: azureProvider, redis: redisProvider, diff --git a/web/libs/ui/src/assets/icons/cloud-provider-backblaze.svg b/web/libs/ui/src/assets/icons/cloud-provider-backblaze.svg new file mode 100644 index 000000000000..5a72b8db785e --- /dev/null +++ b/web/libs/ui/src/assets/icons/cloud-provider-backblaze.svg @@ -0,0 +1,5 @@ + + + + + diff --git a/web/libs/ui/src/assets/icons/index.ts b/web/libs/ui/src/assets/icons/index.ts index 589ca712c740..6bb93d0fd54b 100644 --- a/web/libs/ui/src/assets/icons/index.ts +++ b/web/libs/ui/src/assets/icons/index.ts @@ -270,3 +270,4 @@ export { ReactComponent as IconCloudProviderRedis } from "./cloud-provider-redis export { ReactComponent as IconCloudProviderGCS } from "./cloud-provider-gcs.svg"; export { ReactComponent as IconCloudProviderAzure } from "./cloud-provider-azure.svg"; export { ReactComponent as IconCloudProviderDatabricks } from "./cloud-provider-databricks.svg"; +export { ReactComponent as IconCloudProviderBackblaze } from "./cloud-provider-backblaze.svg";