Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/sachnun/hugbucket/llms.txt

Use this file to discover all available pages before exploring further.

The bridge layer is the concrete implementation of the StorageBackend interface. It translates protocol-agnostic backend calls (like put_object and get_object_stream) into the multi-step sequence of HF Hub API calls and Xet CAS operations required by the Hugging Face storage protocol.

HFStorageBackend (Bridge)

HFStorageBackend in hugbucket/bridge.py is the main class. It is aliased as Bridge for backward-compatible imports:
hugbucket/bridge.py
@dataclass
class HFStorageBackend(StorageBackend):
    """Orchestrates S3 <-> HF Bucket operations."""

    config: Config
    hub: HubClient = field(init=False)
    cas: CASClient = field(init=False)
    _token_cache: dict[str, XetConnectionInfo] = field(...)
    _recon_cache: OrderedDict[str, tuple[float, Reconstruction]] = field(...)
    _xorb_cache: _XorbCache = field(init=False, repr=False)
    _file_info_cache: OrderedDict[str, tuple[float, BucketFile]] = field(...)

# Backward-compatible name kept for existing imports/tests.
Bridge = HFStorageBackend
On construction, __post_init__ creates a HubClient and a CASClient and initialises the xorb LRU cache with the configured byte limit.

HubClient

HubClient in hugbucket/hub/client.py is an async HTTP client for the HF Hub Bucket API. It manages a single aiohttp.ClientSession with a configurable connection pool.
async def whoami(self) -> str:
    """Get username associated with the HF token via /api/whoami-v2."""

async def get_xet_write_token(self, bucket_id: str) -> XetConnectionInfo:
    """Get Xet CAS write credentials."""

async def get_xet_read_token(self, bucket_id: str) -> XetConnectionInfo:
    """Get Xet CAS read credentials."""
XetConnectionInfo carries the CAS URL and short-lived access token returned by the token endpoints:
hugbucket/hub/client.py
@dataclass
class XetConnectionInfo:
    cas_url: str
    access_token: str
    token_expiration: int  # unix epoch
Token endpoints return the CAS URL and token via HTTP response headers (X-Xet-Cas-Url, X-Xet-Access-Token, X-Xet-Token-Expiration). list_buckets and list_bucket_tree follow pagination automatically using the Link: <url>; rel="next" header pattern.

CASClient

CASClient in hugbucket/xet/cas_client.py handles all communication with the Xet CAS endpoint.
async def upload_xorb(
    self,
    conn: XetConnectionInfo,
    xorb_hash: str,
    xorb_data: bytes,
) -> bool:
    """Upload a xorb to CAS. Returns True if newly inserted."""
    # POST {cas_url}/v1/xorbs/default/{xorb_hash}

async def upload_shard(
    self,
    conn: XetConnectionInfo,
    shard_data: bytes,
) -> int:
    """Upload a shard to CAS. Returns result code (0 or 1)."""
    # POST {cas_url}/v1/shards
Both upload_xorb and upload_shard retry on transient errors (HTTP 5xx, connection errors, timeouts) with exponential backoff. The default is 3 retries with a 1-second base delay (doubling each attempt):
hugbucket/config.py
cas_upload_timeout: int = 300   # 5 minutes per CAS request
cas_upload_retries: int = 3     # retry count for CAS xorb/shard uploads
cas_retry_base_delay: float = 1.0  # base delay (seconds) for exponential backoff

Namespace resolution

resolve_namespace() is called once at server startup:
hugbucket/bridge.py
async def resolve_namespace(self) -> str:
    """Resolve namespace from the configured HF token."""
    return await self.hub.whoami()
Internally whoami() calls GET /api/whoami-v2 and returns data["name"]. The result is stored in Config.hf_namespace and used by _bucket_id for every subsequent operation.

Bucket ID format

_bucket_id converts a bare bucket name into the {namespace}/{name} format expected by the HF Hub API:
hugbucket/bridge.py
def _bucket_id(self, bucket_name: str) -> str:
    """Convert S3 bucket name to HF bucket_id (namespace/name)."""
    if "/" in bucket_name:
        return bucket_name
    return f"{self.config.hf_namespace}/{bucket_name}"
If the bucket name already contains a / (e.g. when targeting an org namespace directly), it is used as-is.

Directory markers

S3 clients create folders by PUTting a zero-byte object with a trailing slash (e.g. my-folder/). HF Storage Buckets use virtual directories inferred from file paths and reject addFile calls for paths ending with /. HugBucket materialises empty folders by storing a hidden sentinel file inside them:
hugbucket/bridge.py
DIR_MARKER_FILENAME = ".hugbucket_keep"
DIR_MARKER_CONTENT = b"\n"  # must be non-empty so the full Xet upload runs
When put_object receives a trailing-slash key with zero bytes, it rewrites the key to {key}{DIR_MARKER_FILENAME} and replaces the content with DIR_MARKER_CONTENT before running the normal upload path. Delete operations expand the same way, and list_objects filters marker files from the returned contents while still counting them toward common_prefixes so empty folders show up in listings. head_directory checks for the marker first (fast path), then falls back to listing objects under the prefix if no marker exists:
hugbucket/bridge.py
async def head_directory(self, bucket: str, prefix: str) -> bool:
    bucket_id = self._bucket_id(bucket)

    # Fast path: check for the explicit directory marker
    marker = await self._get_file_info_cached(
        bucket_id, prefix + DIR_MARKER_FILENAME
    )
    if marker is not None:
        return True

    # Slow path: check if any objects exist under this prefix
    all_files = await self.hub.list_bucket_tree(
        bucket_id, prefix=prefix, recursive=True
    )
    return len(all_files) > 0

Server-side copy

Because Xet uses content-addressable storage, copy_object does not re-download or re-upload any data. It reads the source file’s xet_hash from the file info cache and registers the destination path with the same hash via the Hub batch API:
hugbucket/bridge.py
async def copy_object(
    self,
    src_bucket: str,
    src_key: str,
    dst_bucket: str,
    dst_key: str,
) -> dict:
    """Copy an object by registering the destination path with the same xetHash.

    Because Xet uses content-addressable storage, we don't need to
    re-download and re-upload the data — just register a new path
    pointing to the same content hash.
    """
    # Get source file metadata (using cache)
    src_file = await self._get_file_info_cached(src_bucket_id, src_key)

    # Register the new path with the same content hash
    await self.hub.batch_files(
        dst_bucket_id,
        add=[{
            "path": dst_key,
            "xetHash": src_file.xet_hash,
            "mtime": mtime_ms,
            "contentType": content_type,
        }],
    )
    self._invalidate_file_info(dst_bucket_id, dst_key)
A server-side copy between two buckets owned by the same namespace is a metadata-only operation. No bytes are transferred from or to the CAS.

Cache invalidation

After any mutation — put_object, delete_object, delete_objects, copy_object — the file info cache entry for the affected key is immediately evicted:
hugbucket/bridge.py
def _invalidate_file_info(self, bucket_id: str, key: str) -> None:
    """Remove a file_info entry from the cache after a mutation."""
    cache_key = f"{bucket_id}:{key}"
    self._file_info_cache.pop(cache_key, None)
This prevents stale metadata from being served to the next head_object or get_object_stream call while keeping the 30-second TTL in place for read-heavy workloads.