Source code for handtruck.client

  1# AWS S3 API Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/Type_API_Reference.html
  2# Probably useful to anyone working on this module
  3
  4import hashlib
  5import io
  6import logging
  7import os
  8import typing as t
  9from collections import deque
 10from contextlib import suppress
 11from dataclasses import dataclass
 12from functools import partial
 13from http import HTTPStatus
 14from itertools import chain
 15from mimetypes import guess_type
 16from mmap import PAGESIZE
 17from pathlib import Path
 18from tempfile import NamedTemporaryFile
 19import textwrap
 20
 21import anyio
 22import anyio.streams.memory
 23from anyiomisc import asyncbackoff
 24from aws_request_signer import UNSIGNED_PAYLOAD
 25from httpx import URL, HTTPError, AsyncClient, Response, QueryParams, HTTPStatusError
 26
 27from ._async import generate_in_thread, run_in_thread, parallel_file_writer, ChunkSendStream
 28from ._xml import (
 29    AwsObjectMeta, create_complete_upload_request,
 30    parse_create_multipart_upload_id, parse_list_objects, parse_error,
 31)
 32from .credentials import (
 33    AbstractCredentials, collect_credentials,
 34)
 35from .exceptions import S3Error, ConflictError, PreconditionFailed
 36
 37
 38log = logging.getLogger(__name__)
 39
 40CHUNK_SIZE = 2 ** 16
 41
 42DONE = object()
 43EMPTY_STR_HASH = hashlib.sha256(b"").hexdigest()
 44# 5MB
 45
 46PART_SIZE = 5 * 1024 * 1024
 47HeadersType = t.Union[t.Dict]
 48
 49DataType = t.Optional[t.Mapping[str, t.Any]]
 50RequestContent = t.Optional[t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes]]]
 51PrimitiveData = t.Optional[t.Union[str, int, float, bool]]
 52QueryParamTypes = t.Union[
 53    QueryParams,
 54    t.Mapping[str, t.Union[PrimitiveData, t.Sequence[PrimitiveData]]],
 55    t.List[t.Tuple[str, PrimitiveData]],
 56    t.Tuple[t.Tuple[str, PrimitiveData], ...],
 57    str,
 58    bytes,
 59]
 60
 61
 62@dataclass
 63class HEADERS:
 64    CONTENT_LENGTH = 'Content-Length'
 65    CONTENT_TYPE = 'Content-Type'
 66
 67
 68@run_in_thread
 69def concat_files(
 70    target_file: Path, files: t.List[t.IO[bytes]], buffer_size: int,
 71) -> None:
 72    with target_file.open("ab") as fp:
 73        for file in files:
 74            file.seek(0)
 75            while True:
 76                chunk = file.read(buffer_size)
 77                if not chunk:
 78                    break
 79                fp.write(chunk)
 80            file.close()
 81
 82
 83@run_in_thread
 84def write_from_start(
 85    file: io.BytesIO, chunk: bytes, range_start: int, pos: int,
 86) -> None:
 87    file.seek(pos - range_start)
 88    file.write(chunk)
 89
 90
 91@generate_in_thread
 92def gen_without_hash(
 93    stream: t.Iterable[bytes],
 94) -> t.Generator[t.Tuple[None, bytes], None, None]:
 95    for data in stream:
 96        yield (None, data)
 97
 98
 99@generate_in_thread
100def gen_with_hash(
101    stream: t.Iterable[bytes],
102) -> t.Generator[t.Tuple[str, bytes], None, None]:
103    for data in stream:
104        yield hashlib.sha256(data).hexdigest(), data
105
106
107def file_sender(
108    file_name: str|os.PathLike, chunk_size: int = CHUNK_SIZE,
109) -> t.Iterable[bytes]:
110    with open(file_name, "rb") as fp:
111        while True:
112            data = fp.read(chunk_size)
113            if not data:
114                break
115            yield data
116
117
118async_file_sender = generate_in_thread(file_sender)
119
120MultiPart = tuple
121
122#: Type hint for anything expecting a specific object
123type S3Object = str | URL
124#: Type hint for anything expecting a bucket
125type S3Bucket = str | URL
126
[docs] 127class S3Client: 128 def __init__( 129 self, client: AsyncClient, url: t.Union[URL, str], 130 secret_access_key: t.Optional[str] = None, 131 access_key_id: t.Optional[str] = None, 132 session_token: t.Optional[str] = None, 133 region: str = "", 134 credentials: t.Optional[AbstractCredentials] = None, 135 ): 136 url = URL(url) 137 if credentials is None: 138 credentials = collect_credentials( 139 url=url, 140 access_key_id=access_key_id, 141 region=region, 142 secret_access_key=secret_access_key, 143 session_token=session_token, 144 ) 145 146 if not credentials: 147 raise ValueError( 148 f"Credentials {credentials!r} is incomplete", 149 ) 150 151 self._url = url 152 self._client = client 153 self._credentials = credentials 154 155 def __repr__(self): 156 return f"<{type(self).__name__} {self._url!r} creds={self._credentials!r}>" 157 158 @property 159 def url(self) -> URL: 160 """ 161 S3 Endpoint, see :ref:`urls`. 162 """ 163 return self._url 164 165 def _mkurl(self, path: S3Object|S3Bucket) -> URL: 166 """ 167 Combine a path/s3-url with the endpoint to create the real URL. 168 """ 169 # This method is where host-based buckets would go. 170 if not path: 171 # Referring to the implicit bucket 172 return self._url 173 elif isinstance(path, str): 174 # Classical string-based "<bucket>/<object>" (or whatever) 175 return self._url.join(path) 176 elif isinstance(path, URL): 177 if path.scheme == "s3" or path.scheme.startswith("s3+") or path.scheme.endswith("+s3"): 178 # s3://<bucket>/<object> 179 # s3+<whatever>://<bucket>/<object> 180 # <whatever>+s3://<bucket>/<object> 181 if path.path in ("", "/"): 182 # No object 183 if path.host: 184 return self._url.join(path.host) 185 else: 186 # Implied bucket 187 return self._url 188 elif not path.path.startswith("/"): 189 raise ValueError(f"Relative S3 URLs not supported ({path!r})") 190 elif not path.host: 191 # Implied bucket 192 assert path.path[0] == "/" 193 return self._url.join(path.path[1:]) 194 else: 195 # Bucket and object given 196 return self._url.join(f"{path.host}{path.path}") 197 elif path.scheme in ("http", "https"): 198 # Full URL, I guess 199 return self._url.join(path) 200 else: 201 raise ValueError(f"Don't know how to make a request URL out of {path!r}") 202 else: 203 raise TypeError(f"Don't know how to make a request URL out of {path!r}") 204 205 async def request( 206 self, method: str, path: S3Object|S3Bucket, 207 headers: t.Optional[HeadersType] = None, 208 params: t.Optional[QueryParams] = None, 209 content: t.Optional[RequestContent] = None, 210 content_sha256: t.Optional[str] = None, 211 **kwargs, 212 ) -> Response: 213 """ 214 Make an arbitrary request. 215 216 :meta private: 217 """ 218 headers = self._prepare_headers(headers) 219 220 if content is not None and content_sha256 is None: 221 content_sha256 = UNSIGNED_PAYLOAD 222 223 url = self._mkurl(path) 224 if params: 225 url = url.copy_merge_params(params) 226 227 headers = self._make_headers(headers) 228 headers.update( 229 self._credentials.signer.sign_with_headers( 230 method, str(url), headers=headers, content_hash=content_sha256, 231 ), 232 ) 233 resp = await self._client.request( 234 method, url, headers=headers, content=content, **kwargs, 235 ) 236 try: 237 resp.raise_for_status() 238 except HTTPError as herr: 239 # Annotate the original exception with useful information 240 try: 241 herr.add_note(f"URL: {herr.request.url}") 242 except Exception: 243 pass 244 if hasattr(herr, "response"): 245 try: 246 herr.add_note(f"Body:\n{textwrap.indent(herr.response.text, ' ')}") 247 except Exception: 248 pass 249 250 if resp.headers.get('Content-Type', None) == 'application/xml' and b'<Error>' in resp.content: 251 # It looks like error XML, wrap it up so it's useful 252 try: 253 err = parse_error(resp.content) 254 except ValueError: 255 # Bad XML, skip the wrapping 256 raise herr 257 try: 258 err.request = herr.request 259 except Exception: 260 pass 261 if hasattr(herr, "response"): 262 try: 263 err.response = herr.response 264 except Exception: 265 pass 266 raise err from herr 267 else: 268 raise 269 else: 270 return resp 271
[docs] 272 async def get(self, object_name: S3Object|S3Bucket, **kwargs) -> Response: 273 """ 274 Get an object. 275 276 Returns: 277 : The object gotten. 278 """ 279 return await self.request("GET", object_name, **kwargs)
280
[docs] 281 async def head( 282 self, object_name: S3Object|S3Bucket, 283 content_sha256=EMPTY_STR_HASH, 284 **kwargs, 285 ) -> Response: 286 """ 287 Get an object's headers. 288 289 Returns: 290 : The object gotten. 291 """ 292 return await self.request( 293 "HEAD", object_name, content_sha256=content_sha256, **kwargs, 294 )
295
[docs] 296 async def delete( 297 self, object_name: S3Object|S3Bucket, 298 content_sha256=EMPTY_STR_HASH, 299 **kwargs, 300 ) -> Response: 301 """ 302 Delete an object. 303 """ 304 return await self.request( 305 "DELETE", object_name, content_sha256=content_sha256, **kwargs, 306 )
307 308 @staticmethod 309 def _make_headers(headers: t.Optional[HeadersType]) -> dict: 310 headers = dict(headers or {}) 311 return headers 312 313 def _prepare_headers( 314 self, headers: t.Optional[HeadersType], 315 file_path: str = "", 316 ) -> dict: 317 headers = self._make_headers(headers) 318 319 if HEADERS.CONTENT_TYPE not in headers: 320 content_type = guess_type(file_path)[0] 321 if content_type is None: 322 content_type = "application/octet-stream" 323 324 headers[HEADERS.CONTENT_TYPE] = content_type 325 326 return headers 327
[docs] 328 async def put( 329 self, object_name: S3Object|S3Bucket, 330 content: RequestContent, 331 **kwargs, 332 ) -> Response: 333 """ 334 Update an object. 335 """ 336 return await self.request("PUT", object_name, content=content, **kwargs)
337
[docs] 338 async def post( 339 self, object_name: S3Object|S3Bucket, 340 content: RequestContent = None, 341 **kwargs, 342 ) -> Response: 343 return await self.request("POST", object_name, content=content, **kwargs)
344
[docs] 345 async def put_file( 346 self, object_name: S3Object, 347 file_path: str|os.PathLike, 348 *, headers: t.Optional[HeadersType] = None, 349 chunk_size: int = CHUNK_SIZE, content_sha256: t.Optional[str] = None, 350 ) -> Response: 351 """ 352 Update an object from a file (by path) 353 """ 354 headers = self._prepare_headers(headers, str(file_path)) 355 return await self.put( 356 object_name, 357 headers=headers, 358 content=async_file_sender( 359 file_path, 360 chunk_size=chunk_size, 361 ), 362 content_sha256=content_sha256, 363 )
364 365 @asyncbackoff( 366 None, None, 0, 367 max_tries=3, exceptions=(HTTPError,), 368 ) 369 async def _create_multipart_upload( 370 self, 371 object_name: S3Object, 372 headers: t.Optional[HeadersType] = None, 373 ) -> str: 374 resp = await self.post( 375 object_name, 376 headers=headers, 377 params={"uploads": 1}, 378 content_sha256=EMPTY_STR_HASH, 379 ) 380 payload = resp.read() 381 return parse_create_multipart_upload_id(payload) 382 383 @asyncbackoff( 384 None, None, 0, 385 max_tries=3, exceptions=(S3Error, HTTPError), 386 ) 387 async def _complete_multipart_upload( 388 self, 389 upload_id: str, 390 object_name: S3Object, 391 parts: t.List[t.Tuple[int, str]], 392 ) -> None: 393 complete_upload_request = create_complete_upload_request(parts) 394 resp = await self.post( 395 object_name, 396 headers={"Content-Type": "text/xml"}, 397 params={"uploadId": upload_id}, 398 content=complete_upload_request, 399 content_sha256=hashlib.sha256(complete_upload_request).hexdigest(), 400 ) 401 402 async def _put_part( 403 self, 404 upload_id: str, 405 object_name: S3Object, 406 part_no: int, 407 content: RequestContent, 408 content_sha256: str, 409 **kwargs, 410 ) -> str: 411 resp = await self.put( 412 object_name, 413 params={"partNumber": part_no, "uploadId": upload_id}, 414 content=content, 415 content_sha256=content_sha256, 416 **kwargs, 417 ) 418 return resp.headers["Etag"].strip('"') 419 420 async def _part_uploader( 421 self, 422 upload_id: str, 423 object_name: S3Object, 424 parts_stream: anyio.streams.memory.MemoryObjectReceiveStream[MultiPart], 425 results_queue: deque, 426 part_upload_tries: int, 427 **kwargs, 428 ) -> None: 429 backoff = asyncbackoff( 430 None, None, 431 max_tries=part_upload_tries, 432 exceptions=(HTTPError,), 433 ) 434 async for part_no, part_hash, part in parts_stream: 435 etag = await backoff(self._put_part)( 436 upload_id=upload_id, 437 object_name=object_name, 438 part_no=part_no, 439 content=part, 440 content_sha256=part_hash, 441 **kwargs, 442 ) 443 log.debug( 444 "Etag for part %d of %s is %s", part_no, upload_id, etag, 445 ) 446 results_queue.append((part_no, etag)) 447
[docs] 448 async def put_file_multipart( 449 self, 450 object_name: S3Object, 451 file_path: str|os.PathLike, 452 *, 453 headers: t.Optional[HeadersType] = None, 454 part_size: int = PART_SIZE, 455 workers_count: int = 1, 456 max_size: t.Optional[int] = None, 457 part_upload_tries: int = 3, 458 calculate_content_sha256: bool = True, 459 **kwargs, 460 ) -> None: 461 """ 462 Upload data from a file with multipart upload 463 464 Args: 465 object_name: key in s3 466 file_path: path to a file for upload 467 headers: additional headers, such as Content-Type 468 part_size: size of a chunk to send (recommended: >5Mb) 469 workers_count: count of coroutines for asyncronous parts uploading 470 max_size: maximum size of a queue with data to send (should be 471 at least ``workers_count``) 472 part_upload_tries: how many times trying to put part to s3 before fail 473 calculate_content_sha256: whether to calculate sha256 hash of a part 474 for integrity purposes 475 """ 476 log.debug( 477 "Going to multipart upload %s to %s with part size %d", 478 file_path, object_name, part_size, 479 ) 480 await self.put_multipart( 481 object_name, 482 file_sender( 483 file_path, 484 chunk_size=part_size, 485 ), 486 headers=headers, 487 workers_count=workers_count, 488 max_size=max_size, 489 part_upload_tries=part_upload_tries, 490 calculate_content_sha256=calculate_content_sha256, 491 **kwargs, 492 )
493 494 async def _parts_generator( 495 self, gen: t.AsyncIterable[tuple], workers_count: int, parts_stream: anyio.streams.memory.MemoryObjectSendStream[MultiPart], 496 ) -> int: 497 part_no = 1 498 async with parts_stream: 499 async for part_hash, part in gen: 500 log.debug( 501 "Reading part %d (%d bytes)", part_no, len(part), 502 ) 503 await parts_stream.send((part_no, part_hash, part)) 504 part_no += 1 505 506 return part_no 507
[docs] 508 async def put_multipart( 509 self, 510 object_name: S3Object, 511 content: t.Iterable[bytes], 512 *, 513 headers: t.Optional[HeadersType] = None, 514 workers_count: int = 1, 515 max_size: t.Optional[int] = None, 516 part_upload_tries: int = 3, 517 calculate_content_sha256: bool = True, 518 **kwargs, 519 ) -> None: 520 """ 521 Send data from iterable with multipart upload 522 523 Args: 524 object_name: key in s3 525 data: any iterable that returns chunks of bytes 526 headers: additional headers, such as Content-Type 527 workers_count: count of coroutines for asyncronous parts uploading 528 max_size: maximum size of a queue with data to send (should be 529 at least ``workers_count``) 530 part_upload_tries: how many times trying to put part to s3 before fail 531 calculate_content_sha256: whether to calculate sha256 hash of a part 532 for integrity purposes 533 """ 534 if workers_count < 1: 535 raise ValueError( 536 f"Workers count should be > 0. Got {workers_count}", 537 ) 538 max_size = max_size or workers_count 539 540 upload_id = await self._create_multipart_upload( 541 object_name, 542 headers=headers, 543 ) 544 log.debug("Got upload id %s for %s", upload_id, object_name) 545 546 results_queue: deque = deque() 547 try: 548 async with anyio.create_task_group() as tg: 549 send_stream, receive_stream = anyio.create_memory_object_stream() 550 for wid in range(workers_count): 551 tg.start_soon(partial( 552 self._part_uploader, 553 upload_id, 554 object_name, 555 receive_stream.clone(), 556 results_queue, 557 part_upload_tries, 558 **kwargs, 559 ), name=f"put-worker-{upload_id}@{wid}") 560 # Get rid of our copy 561 receive_stream.close() 562 del receive_stream 563 564 if calculate_content_sha256: 565 gen = gen_with_hash(content) 566 else: 567 gen = gen_without_hash(content) 568 569 part_no = await self._parts_generator(gen, workers_count, send_stream) 570 except* Exception as excgroup: 571 for exc in excgroup.exceptions: 572 raise exc from None 573 574 log.debug( 575 "All parts (#%d) of %s are uploaded to %s", 576 part_no - 1, upload_id, object_name, 577 ) 578 579 # Parts should be in ascending order 580 parts = sorted(results_queue, key=lambda x: x[0]) 581 await self._complete_multipart_upload( 582 upload_id, object_name, parts, 583 )
584 585 async def _download_range( 586 self, 587 object_name: S3Object, 588 writer: ChunkSendStream, 589 *, 590 etag: str, 591 pos: int, 592 range_start: int, 593 req_range_start: int, 594 req_range_end: int, 595 buffer_size: int, 596 headers: t.Optional[HeadersType] = None, 597 **kwargs, 598 ) -> None: 599 """ 600 Downloading range [req_range_start:req_range_end] to `file` 601 """ 602 log.debug( 603 "Downloading %s from %d to %d", 604 object_name, 605 req_range_start, 606 req_range_end, 607 ) 608 if not headers: 609 headers = {} 610 headers = headers.copy() 611 headers["Range"] = f"bytes={req_range_start}-{req_range_end}" 612 headers["If-Match"] = etag 613 614 resp = await self.get(object_name, headers=headers, **kwargs) 615 if resp.status_code not in (HTTPStatus.PARTIAL_CONTENT, HTTPStatus.OK): 616 raise HTTPStatusError( 617 f"Got wrong status code {resp.status_code} on range download " 618 f"of {object_name}", 619 request=resp.request, response=resp 620 ) 621 assert 'Content-Range' in resp.headers 622 assert resp.headers['Content-Range'].startswith(f"bytes {req_range_start}-{req_range_end}/") 623 # FIXME: Handle OK 624 # FIXME: Handle Content-Range being different from requested 625 pos = req_range_start 626 async for chunk in resp.aiter_bytes(buffer_size): 627 if not chunk: 628 break 629 await writer.send((pos, chunk)) 630 pos += len(chunk) 631 632 async def _download_worker( 633 self, 634 object_name: S3Object, 635 writer: ChunkSendStream, 636 *, 637 etag: str, 638 range_step: int, 639 range_start: int, 640 range_end: int, 641 buffer_size: int, 642 range_get_tries: int = 3, 643 headers: t.Optional[HeadersType] = None, 644 **kwargs, 645 ) -> None: 646 """ 647 Downloads data in range `[range_start, range_end)` 648 with step `range_step` to file `file_path`. 649 Uses `etag` to make sure that file wasn't changed in the process. 650 """ 651 log.debug( 652 "Starting download worker for range [%d:%d]", 653 range_start, 654 range_end, 655 ) 656 async with writer: 657 backoff = asyncbackoff( 658 None, None, 659 max_tries=range_get_tries, 660 exceptions=(HTTPError,), 661 ) 662 req_range_end = range_start 663 for req_range_start in range(range_start, range_end, range_step): 664 req_range_end += range_step 665 if req_range_end > range_end: 666 req_range_end = range_end 667 await backoff(self._download_range)( 668 object_name, 669 writer, 670 etag=etag, 671 pos=(req_range_start - range_start), 672 range_start=range_start, 673 req_range_start=req_range_start, 674 req_range_end=req_range_end - 1, 675 buffer_size=buffer_size, 676 headers=headers, 677 **kwargs, 678 ) 679
[docs] 680 async def get_file_parallel( 681 self, 682 object_name: S3Object, 683 file_path: str|os.PathLike, 684 *, 685 headers: t.Optional[HeadersType] = None, 686 range_step: int = PART_SIZE, 687 workers_count: int = 1, 688 range_get_tries: int = 3, 689 buffer_size: int = PAGESIZE * 32, 690 **kwargs, 691 ) -> None: 692 """ 693 Download object in parallel with requests with Range. If file changed 694 while download is in progress, an error will be raised. 695 696 Args: 697 object_name: s3 key to download 698 file_path: target file path 699 headers: additional headers 700 range_step: how much data will be downloaded in single HTTP request 701 workers_count: count of parallel workers 702 range_get_tries: count of tries to download each range 703 buffer_size: size of a buffer for on the fly data 704 """ 705 file_path = anyio.Path(file_path) 706 resp = await self.head(object_name, headers=headers) 707 708 etag = resp.headers["Etag"] 709 file_size = int(resp.headers["Content-Length"]) 710 log.debug( 711 "Object's %s etag is %s and size is %d", 712 object_name, 713 etag, 714 file_size, 715 ) 716 717 worker_range_size = file_size // workers_count 718 range_end = 0 719 try: 720 try: 721 async with ( 722 await anyio.open_file(file_path, "w+b") as fp, 723 parallel_file_writer(fp) as pfw, 724 anyio.create_task_group() as tg, 725 ): 726 for range_start in range(0, file_size, worker_range_size): 727 range_end += worker_range_size 728 if range_end > file_size: 729 range_end = file_size 730 tg.start_soon(partial( 731 self._download_worker, 732 object_name, 733 await pfw.get_block(range_start, range_end), 734 buffer_size=buffer_size, 735 etag=etag, 736 headers=headers, 737 range_end=range_end, 738 range_get_tries=range_get_tries, 739 range_start=range_start, 740 range_step=range_step, 741 **kwargs, 742 ), name=f"get-worker@{range_start}") 743 except* PreconditionFailed as excgroup: 744 raise ConflictError(f"Object {object_name!r} changed while downloading") from excgroup 745 except* Exception as excgroup: 746 # Unwrap and raise just one 747 for exc in excgroup.exceptions: 748 raise exc from None 749 750 except Exception: 751 log.exception( 752 "Error on file download. Removing possibly incomplete file %s", 753 file_path, 754 ) 755 with suppress(FileNotFoundError): 756 await file_path.unlink() 757 raise
758
[docs] 759 async def list_objects_v2( 760 self, 761 object_name: str|URL|None = None, 762 *, 763 bucket: S3Bucket|None = None, 764 prefix: t.Optional[t.Union[str, Path]] = None, 765 delimiter: t.Optional[str] = None, 766 max_keys: t.Optional[int] = None, 767 start_after: t.Optional[str] = None, 768 ) -> t.AsyncIterator[t.List[AwsObjectMeta]]: 769 """ 770 List objects in bucket. 771 772 Args: 773 object_name: 774 path to listing endpoint, defaults to ``'/'``; a ``bucket`` value is 775 prepended to this value if provided. 776 prefix: 777 limits the response to keys that begin with the specified 778 prefix 779 delimiter: a delimiter is a character you use to group keys 780 max_keys: maximum number of keys returned in the response 781 start_after: keys to start listing after 782 783 Returns: 784 : An iterator over lists of metadata objects, each corresponding 785 to an individual response result (typically limited to 1000 keys). 786 787 .. note:: 788 789 Despite the name :meth:`list_objects` is newer and probably more ergonomic. 790 """ 791 792 params = { 793 "list-type": "2", 794 } 795 796 if prefix: 797 params["prefix"] = str(prefix) 798 799 if delimiter: 800 params["delimiter"] = delimiter 801 802 if max_keys: 803 params["max-keys"] = str(max_keys) 804 805 if start_after: 806 params["start-after"] = start_after 807 808 if bucket and object_name: 809 raise ValueError("Must give either bucket or object_name") 810 811 while True: 812 resp = await self.get(bucket or object_name or '', params=params, headers={'Accept': 'application/xml'}) 813 payload = resp.content 814 metadata, continuation_token = parse_list_objects(payload) 815 if not metadata: 816 break 817 yield metadata 818 if not continuation_token: 819 break 820 params["continuation-token"] = continuation_token
821
[docs] 822 async def list_objects( 823 self, 824 bucket: S3Bucket|None=None, 825 *, 826 prefix: str|None = None, 827 delimiter: t.Optional[str] = None, 828 max_keys: t.Optional[int] = None, 829 start_after: t.Optional[str] = None, 830 ) -> t.AsyncIterator[AwsObjectMeta]: 831 """ 832 List objects in bucket. 833 834 Args: 835 bucket: 836 Name of the bucket to list, or :data:`None` if the bucket is in the endpoint. 837 prefix: 838 limits the response to keys that begin with the specified 839 prefix 840 delimiter: a delimiter is a character you use to group keys 841 max_keys: maximum number of keys returned in the response 842 start_after: keys to start listing after 843 844 Returns: 845 : Iterator over metadata objects 846 """ 847 params = { 848 "list-type": "2", 849 } 850 851 if prefix: 852 params["prefix"] = str(prefix) 853 854 if delimiter: 855 params["delimiter"] = delimiter 856 857 if max_keys: 858 params["max-keys"] = str(max_keys) 859 860 if start_after: 861 params["start-after"] = start_after 862 863 while True: 864 resp = await self.get(bucket or '', params=params, headers={'Accept': 'application/xml'}) 865 payload = resp.content 866 metadata, continuation_token = parse_list_objects(payload) 867 if not metadata: 868 break 869 for objmeta in metadata: 870 yield objmeta 871 if not continuation_token: 872 break 873 params["continuation-token"] = continuation_token