Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for blobs synchronization #40271

Open
martinResearch opened this issue Mar 28, 2025 · 4 comments
Open

Support for blobs synchronization #40271

martinResearch opened this issue Mar 28, 2025 · 4 comments
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. feature-request This issue requires a new behavior in the product in order be resolved. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team Service Attention Workflow: This issue is responsible by Azure service team. Storage Storage Service (Queues, Blobs, Files)

Comments

@martinResearch
Copy link

Is your feature request related to a problem? Please describe.

I have a many blobs in a blob storage folder and would like to synchronize them with local copies i.e. download blobs that are missing locally or have been modified in the azure storage since the last synchronization, based on the last modification data and/or the MD5 hash.

azcopy supports that feature trough azcopy sync https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-blobs-synchronize
It would be great to have the same feature through the Python SDK for a better integration in the python ecosystem.

Describe the solution you'd like
I would like a method

def synchronize_blobs(blobs_prefix:Optional[str]=None, blobs_list:Optional[list[str]]=None, local_folder:str,  md5:bool=True)

that can take either a prefix or an explicit list of blobs and synchronize with files in a local folder.

I have implemented my own class derived from azure.storage.blob.ContainerClient to support that feature, but I would prefer it to be supported by the SDK out of the box

@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. Storage Storage Service (Queues, Blobs, Files) labels Mar 28, 2025
Copy link

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @jalauzon-msft @vincenttran-msft.

Copy link

Hello @martinResearch. I'm an AI assistant for the azure-sdk-for-python repository. I have some suggestions that you can try out while the team gets back to you.

• This feature request targets adding a built‐in method to synchronize local files and Azure blobs, similar to the functionality provided by azcopy sync (azcopy sync documentation).

• Currently, the Python SDK does not include a native synchronization method that compares last modification times or MD5 hashes out-of-the-box. We recommend using custom implementations derived from ContainerClient as an interim solution.

• As a future enhancement, consolidating such a sync method into the SDK could improve integration into Python workflows. Please consider providing detailed use cases and scenarios in your feature request to help prioritize its development.

The team will get back to you shortly, hopefully this helps in the meantime.

@jalauzon-msft jalauzon-msft added feature-request This issue requires a new behavior in the product in order be resolved. and removed question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Mar 28, 2025
@jalauzon-msft
Copy link
Member

Hi @martinResearch, thanks for reaching out. Unfortunately, while we do see the value in this type of feature, to be perfectly honest, this is not something we are likely to add to the SDK anytime soon. A sync function such as this is a very complicated feature that even the AzCopy team still encounters issues with, especially given it needs to be highly generalized to support many different scenarios. We simply don't have the engineering resources to implement/maintain such a feature at this time. Additionally, if we do ever introduce something like this, it would likely be in a separate package. If you have an implementation that works for you, I recommend you continue to use that.

@martinResearch
Copy link
Author

my current implementation:

"""ContainerClientExtended - a helper class for accessing Azure Blob Storage.

ContainerClientExtended is a wrapper around azure.storage.blob.ContainerClient which
provides few additional helper methods.
"""

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
import hashlib
import os
from pathlib import Path
import time
from typing import Iterable, Tuple, Union, Dict, Optional
import uuid

from azure.storage.blob import BlobProperties, ContainerClient
from azure.storage.blob._container_client import AzureNamedKeyCredential, AzureSasCredential, TokenCredential

class ContainerClientExtended(ContainerClient):
"""An extension of azure.storage.blob.ContainerClient which automatically performs authentication.

Args:
    account_url: The URI to the storage account, e.g. 'https://mydata.blob.core.windows.net'.
    container_name: The name of the blob container.
"""

def __init__(
    self,
    account_url: str,
    container_name: str,
    timeout: int = 20,
    copy_dates: bool = True,
    credential: Optional[
        Union[str, Dict[str, str], AzureNamedKeyCredential, AzureSasCredential, "TokenCredential"]
    ] = None,
):
    """Performs authentication and initializes BlobStorageClient.

    Args:
        account_url: URL of the Azure storage account.
        container_name: Name of the Azure blob storage container.
        timeout: Timeout in seconds for connections/reads from the container.
        copy_dates: controls if we copy last_modified date from the azure blobs when downloading files
    """
    super().__init__(
        account_url,
        container_name,
        credential=credential,
        connection_timeout=timeout,
        read_timeout=timeout,
    )
    self._copy_dates = copy_dates

@classmethod
def _local_copy_exists_md5(cls, blob_props: BlobProperties, local_file: Union[Path, str]) -> bool:
    """Checks if the blob already exists locally based on MD5 hash.

    Args:
        blob_props: Blob properties of the blob to check.
        local_file: Location of the local file.

    Returns:
        true if the local file exists and contains the same data.
    """
    local_file = Path(local_file)
    if local_file.exists():
        hash_md5 = hashlib.md5()
        with open(local_file, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        md5_local = hash_md5.digest()
        md5_remote = blob_props.content_settings.content_md5
        return md5_local == md5_remote
    else:
        return False

@classmethod
def _local_copy_exists_dates_and_size(
    cls, blob_props: BlobProperties, local_file: Union[Path, str], verbose: bool = False
) -> bool:
    """Checks if the blob already exists locally base on dates and file size.

    This is used a fallback when the file is too large for azure storage to compute the md5.
    """
    local_file = Path(local_file)
    if not local_file.exists():
        return False

    local_file_stats = os.stat(local_file)

    local_file_last_modified_utc = datetime.fromtimestamp(local_file_stats.st_mtime, tz=timezone.utc)
    remote_file_last_modified_utc = blob_props["last_modified"]
    remote_file_size = blob_props["size"]
    local_file_size = local_file_stats.st_size
    if remote_file_last_modified_utc != local_file_last_modified_utc:
        if verbose:
            print(
                f"The local and remote last modified dates are different for file {local_file}:\n"
                f"Local last modified date utc: {local_file_last_modified_utc}\n"
                f"Remote last modified date utc: {remote_file_last_modified_utc})"
            )
        return False
    elif remote_file_size != local_file_size:
        if verbose:
            print(
                f"The local and remote file sizes are different for file {local_file}:\n"
                f"Local file size:  {local_file_size}B\n"
                f"Remote file size: {remote_file_size}B"
            )
        return False
    else:
        return True

@classmethod
def _local_copy_exists(cls, blob_props: BlobProperties, local_file: Union[Path, str]) -> bool:
    if blob_props.content_settings.content_md5:
        return cls._local_copy_exists_md5(blob_props=blob_props, local_file=local_file)
    else:
        # Azure storage does not compute md5 files for large files.
        # Fall back using the modification date and file size to detect if local copy already exist
        return cls._local_copy_exists_dates_and_size(blob_props=blob_props, local_file=local_file)

def local_copy_exists(self, blob_name: str, file_path: Union[Path, str]) -> bool:
    """Check if the local copy of the file already exists."""
    file_path = Path(file_path)
    return self._local_copy_exists(self.get_blob_client(blob_name).get_blob_properties(), file_path)

def download_blob_to_file(
    self, blob_name: str, file_path: Union[Path, str], overwrite: bool = False, verbose: bool = False
) -> None:
    """Downloads a blob and saves it to a file.

    Args:
        blob_name: Blob name, possibly including multiple path segments.
        file_path: Local file path where the blob will be saved.
        overwrite: Forces re-download if the local file already exists.
        verbose:   controls the verbosity of the function.
    """
    file_path = Path(file_path)

    blob_properties = self.get_blob_client(blob_name).get_blob_properties()

    if overwrite or not self._local_copy_exists(blob_properties, file_path):
        # Downloading the data first in a temporary file then renaming the file to the in order
        # to avoid getting a corrupted file if one interrupts a process while a file
        # is being downloaded in that process.
        tmp_file = file_path.with_suffix(file_path.suffix + "." + uuid.uuid4().hex[:10] + ".tmp")
        file_path.parent.mkdir(exist_ok=True, parents=True)
        if verbose:
            print(f"    {blob_name} -> {file_path}")
        with open(tmp_file, "wb") as file:
            super().download_blob(blob_name).readinto(file)

        if self._copy_dates:
            # copy last modified data into local file last modified date
            last_modified = blob_properties["last_modified"].timestamp()
            last_accessed = last_modified
            os.utime(tmp_file, (last_modified, last_accessed))

        if file_path.exists():
            file_path.unlink()
        tmp_file.rename(file_path)
    else:
        if verbose:
            print(f"    {blob_name} present locally, not overwritten")

def upload_file_to_blob(self, blob_name: str, file_path: Union[Path, str], overwrite: bool = False) -> None:
    """Uploads file to blob.

    Args:
        blob_name: Blob name, possibly including multiple path segments.
        file_path: Local file path.
        overwrite: Whether the blob to be uploaded should overwrite the current data.
                   If True, upload_blob will overwrite the existing data. If set to False, the
                   operation will fail with ResourceExistsError
    """
    with open(file_path, "rb") as data:
        super().upload_blob(blob_name, data, overwrite=overwrite)

def download_blobs_to_files(
    self,
    blob_filename_pairs: Iterable[Tuple[str, str]],
    concurrency_limit: int = 1000,
    verbose: bool = False,
) -> int:
    """Downloads a list of files from an azure blob container.

    Args:
        blob_filename_pairs: List[Tuple[str, str]]:List of blob and local path pairs
        concurrency_limit: Maximum number of threads.
        verbose: controls verbosity of the function.
    """
    if verbose:
        print("Checking files exist locally...", end="")

    start_time = time.perf_counter()

    with ThreadPoolExecutor(concurrency_limit) as executor:
        futures_local_exists = [
            (blob_path, local_file_path, executor.submit(self.local_copy_exists, blob_path, local_file_path))
            for blob_path, local_file_path in blob_filename_pairs
        ]

        files_to_download = [(future[0], future[1]) for future in futures_local_exists if not future[2].result()]

    finished_time = time.perf_counter()

    if verbose:
        print(f"done in {finished_time-start_time} seconds")

    # Downloading with overwrite set to true as we already checked these file do not exist locally.

    if verbose:
        print(f"Downloading {len(files_to_download)} files...", end="")

    start_time = time.perf_counter()

    with ThreadPoolExecutor(concurrency_limit) as executor:
        futures_download = [
            executor.submit(self.download_blob_to_file, blob_path, local_file_path, overwrite=True, verbose=False)
            for blob_path, local_file_path in files_to_download
        ]

        # Getting the results in order to wait for all download to complete
        # and raise the potential errors that are thrown in any subtask.
        for future in futures_download:
            future.result()

    nb_downloaded = len(files_to_download)

    finished_time = time.perf_counter()

    if verbose:
        print(f"done in {finished_time-start_time} seconds")

    return nb_downloaded

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. feature-request This issue requires a new behavior in the product in order be resolved. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team Service Attention Workflow: This issue is responsible by Azure service team. Storage Storage Service (Queues, Blobs, Files)
Projects
None yet
Development

No branches or pull requests

2 participants