Source code for ai.backend.client.vfolder

import asyncio
from pathlib import Path
import re
from typing import Sequence, Union
import zlib

import aiohttp
from tqdm import tqdm

from .base import api_function
from .exceptions import BackendAPIError, BackendClientError
from .request import Request, AttachedFile
from .cli.pretty import ProgressReportingReader

__all__ = (
    'VFolder',
)

_rx_slug = re.compile(r'^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$')


[docs]class VFolder: session = None '''The client session instance that this function class is bound to.'''
[docs] @api_function @classmethod async def create(cls, name: str, host: str = None): assert _rx_slug.search(name) is not None, 'Invalid vfolder name format' rqst = Request(cls.session, 'POST', '/folders') rqst.set_json({ 'name': name, 'host': host, }) async with rqst.fetch() as resp: return await resp.json()
def __init__(self, name: str): assert _rx_slug.search(name) is not None, 'Invalid vfolder name format' self.name = name
[docs] @api_function @classmethod async def list(cls): rqst = Request(cls.session, 'GET', '/folders') async with rqst.fetch() as resp: return await resp.json()
[docs] @api_function @classmethod async def list_hosts(cls): rqst = Request(cls.session, 'GET', '/folders/_/hosts') async with rqst.fetch() as resp: return await resp.json()
[docs] @api_function async def info(self): rqst = Request(self.session, 'GET', '/folders/{0}'.format(self.name)) async with rqst.fetch() as resp: return await resp.json()
[docs] @api_function async def delete(self): rqst = Request(self.session, 'DELETE', '/folders/{0}'.format(self.name)) async with rqst.fetch(): return {}
[docs] @api_function async def rename(self, new_name): assert _rx_slug.search(new_name) is not None, 'Invalid vfolder name format' rqst = Request(self.session, 'POST', '/folders/{0}/rename'.format(self.name)) rqst.set_json({ 'new_name': new_name, }) async with rqst.fetch() as resp: self.name = new_name return await resp.text()
[docs] @api_function async def upload(self, files: Sequence[Union[str, Path]], basedir: Union[str, Path] = None, show_progress: bool = False): base_path = (Path.cwd() if basedir is None else Path(basedir).resolve()) files = [Path(file).resolve() for file in files] total_size = 0 for file_path in files: total_size += file_path.stat().st_size tqdm_obj = tqdm(desc='Uploading files', unit='bytes', unit_scale=True, total=total_size, disable=not show_progress) with tqdm_obj: attachments = [] for file_path in files: try: attachments.append(AttachedFile( str(file_path.relative_to(base_path)), ProgressReportingReader(str(file_path), tqdm_instance=tqdm_obj), 'application/octet-stream', )) except ValueError: msg = 'File "{0}" is outside of the base directory "{1}".' \ .format(file_path, base_path) raise ValueError(msg) from None rqst = Request(self.session, 'POST', '/folders/{}/upload'.format(self.name)) rqst.attach_files(attachments) async with rqst.fetch() as resp: return await resp.text()
[docs] @api_function async def mkdir(self, path: Union[str, Path]): rqst = Request(self.session, 'POST', '/folders/{}/mkdir'.format(self.name)) rqst.set_json({ 'path': path, }) async with rqst.fetch() as resp: return await resp.text()
[docs] @api_function async def delete_files(self, files: Sequence[Union[str, Path]], recursive: bool = False): rqst = Request(self.session, 'DELETE', '/folders/{}/delete_files'.format(self.name)) rqst.set_json({ 'files': files, 'recursive': recursive, }) async with rqst.fetch() as resp: return await resp.text()
[docs] @api_function async def download(self, files: Sequence[Union[str, Path]], show_progress: bool = False): rqst = Request(self.session, 'GET', '/folders/{}/download'.format(self.name)) rqst.set_json({ 'files': files, }) try: async with rqst.fetch() as resp: if resp.status // 100 != 2: raise BackendAPIError(resp.status, resp.reason, await resp.text()) total_bytes = int(resp.headers['X-TOTAL-PAYLOADS-LENGTH']) tqdm_obj = tqdm(desc='Downloading files', unit='bytes', unit_scale=True, total=total_bytes, disable=not show_progress) reader = aiohttp.MultipartReader.from_response(resp.raw_response) with tqdm_obj as pbar: acc_bytes = 0 while True: part = await reader.next() if part is None: break # It seems like that there's no automatic # decompression steps in multipart reader for # chuncked encoding. encoding = part.headers['Content-Encoding'] zlib_mode = (16 + zlib.MAX_WBITS if encoding == 'gzip' else -zlib.MAX_WBITS) decompressor = zlib.decompressobj(wbits=zlib_mode) fp = open(part.filename, 'wb') while True: # default chunk size: 8192 chunk = await part.read_chunk() if not chunk: break raw_chunk = decompressor.decompress(chunk) fp.write(raw_chunk) acc_bytes += len(raw_chunk) pbar.update(len(raw_chunk)) fp.close() pbar.update(total_bytes - acc_bytes) except (asyncio.CancelledError, asyncio.TimeoutError): # These exceptions must be bubbled up. raise except aiohttp.ClientError as e: msg = 'Request to the API endpoint has failed.\n' \ 'Check your network connection and/or the server status.' raise BackendClientError(msg) from e
[docs] @api_function async def list_files(self, path: Union[str, Path] = '.'): rqst = Request(self.session, 'GET', '/folders/{}/files'.format(self.name)) rqst.set_json({ 'path': path, }) async with rqst.fetch() as resp: return await resp.json()
[docs] @api_function async def invite(self, perm: str, emails: Sequence[str]): rqst = Request(self.session, 'POST', '/folders/{}/invite'.format(self.name)) rqst.set_json({ 'perm': perm, 'user_ids': emails, }) async with rqst.fetch() as resp: return await resp.json()
[docs] @api_function @classmethod async def invitations(cls): rqst = Request(cls.session, 'GET', '/folders/invitations/list') async with rqst.fetch() as resp: return await resp.json()
[docs] @api_function @classmethod async def accept_invitation(cls, inv_id: str, inv_ak: str): rqst = Request(cls.session, 'POST', '/folders/invitations/accept') rqst.set_json({'inv_id': inv_id, 'inv_ak': inv_ak}) async with rqst.fetch() as resp: return await resp.json()
[docs] @api_function @classmethod async def delete_invitation(cls, inv_id: str): rqst = Request(cls.session, 'DELETE', '/folders/invitations/delete') rqst.set_json({'inv_id': inv_id}) async with rqst.fetch() as resp: return await resp.json()