Storage#
Overview#
Every compute-unit in dxflow can have its own storage module, allowing for efficient data management across different compute units. The storage module provides a unified API to interact with various storage backends, making it easy to store, retrieve, and manage data across AWS, Azure, Google Cloud, and other providers.
Usage#
Session-Level Storage Manager#
You can access storage at the session level for global storage operations:
from dxflow import Session
session = Session(email="your@email.com", password="your_password")
storage_manager = session.get_storage_manager()
# List files in your storage
storage_manager.list(provider="AWS", path="/")
# Upload a file
storage_manager.upload(
src="/local/file.txt",
dst="/remote/path/",
provider="AWS"
)
# Download a file
storage_manager.download(
src="/remote/file.txt",
dst="/local/downloads/",
provider="AWS"
)
Compute Unit Storage#
Each compute unit has its own storage context:
from dxflow import Session
session = Session(email="your@email.com", password="your_password")
compute_manager = session.get_compute_manager()
compute_unit = compute_manager.get_unit(name="Demo")
compute_unit.storage.ls() # an ls like command to list files in the storage
d 4096 2025-02-08 03:51:41 .dx
d 4096 2024-12-12 10:07:47 .ipynb_checkpoints
d 4096 2024-06-25 07:00:17 simulation_outputs
d 4096 2025-04-10 22:01:07 boltz_inputs
d 4096 2024-08-16 22:55:39 data
d 4096 2024-05-13 11:07:00 support_files
d 4096 2024-12-14 09:24:10 mac
d 4096 2025-01-30 06:52:38 me
d 4096 2024-06-30 10:56:40 verified_results
d 4096 2025-05-05 23:26:22 transfer
- 137484 2024-12-12 09:31:58 preview.png
- 128229 2025-01-16 23:53:12 screenshot.png
StorageManager Methods#
Core Methods:#
list(provider="AWS", path="/"): Lists storage items at the specified path in a formatted table.update_path_items(path="/", storage_type='private', provider='AWS'): Updates the cache of items at the specified path.update_all(provider="AWS", path="/"): Recursively updates all items including subdirectories.upload(src, dst, provider="AWS", utype='private'): Uploads a file to the storage.src: Local file pathdst: Remote destination pathprovider: Cloud provider (AWS, AZURE, GCP)utype: Upload type (‘private’ or ‘public’)
download(src, dst=os.getcwd(), provider="AWS"): Downloads a file from the storage.src: Remote file pathdst: Local destination directoryprovider: Cloud provider
Unit Storage Methods#
When accessing storage through a compute unit:
list(path="/", recursive=False, return_info=False): Lists storage items at the specified path.ls(path="/", recursive=False): Alias for thelist()method.upload(src, dst): Uploads a file to the unit’s storage.download(src, dst, create_dir=True): Downloads a file from the unit’s storage.mkdir(path): Creates a directory in the storage.delete(path): Deletes a file or directory from the storage.copy(src, dst): Copies a file or directory within the storage.move(src, dst): Moves a file or directory within the storage.
Examples#
Working with Multiple Providers#
# AWS S3
storage_manager.upload(
src="/local/data.csv",
dst="/s3-bucket/data/",
provider="AWS"
)
# Azure Blob Storage
storage_manager.upload(
src="/local/model.pkl",
dst="/container/models/",
provider="AZURE"
)
# Google Cloud Storage
storage_manager.upload(
src="/local/results.json",
dst="/gcs-bucket/results/",
provider="GCP"
)
Batch Operations#
import os
# Upload all CSV files in a directory
data_dir = "/local/data/"
for filename in os.listdir(data_dir):
if filename.endswith('.csv'):
storage_manager.upload(
src=os.path.join(data_dir, filename),
dst="/project/datasets/",
provider="AWS"
)
# Download all files from a remote directory
storage_manager.update_path_items(path="/results/", provider="AWS")
content = storage_manager._get_content_from_path("AWS", "/results/")
if content:
for item in content:
if not item.get('directory'):
storage_manager.download(
src=f"/results/{item['name']}",
dst="/local/results/",
provider="AWS"
)
Error Handling#
from dxflow.exceptions import StorageError
try:
storage_manager.upload(
src="/local/file.txt",
dst="/remote/",
provider="AWS"
)
except StorageError as e:
print(f"Upload failed: {e}")