Source code for handtruck.client

  1# AWS S3 API Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/Type_API_Reference.html
  2# Probably useful to anyone working on this module
  3
  4import hashlib
  5import io
  6import logging
  7import os
  8import typing as t
  9from collections import deque
 10from contextlib import suppress
 11from dataclasses import dataclass
 12from functools import partial
 13from http import HTTPStatus
 14from itertools import chain
 15from mimetypes import guess_type
 16from mmap import PAGESIZE
 17from pathlib import Path
 18from tempfile import NamedTemporaryFile
 19import textwrap
 20
 21import anyio
 22import anyio.streams.memory
 23from anyiomisc import asyncbackoff
 24from aws_request_signer import UNSIGNED_PAYLOAD
 25from httpx import URL, HTTPError, AsyncClient, Response, QueryParams, HTTPStatusError
 26
 27from ._async import generate_in_thread, run_in_thread, parallel_file_writer, ChunkSendStream
 28from ._xml import (
 29    AwsObjectMeta, create_complete_upload_request,
 30    parse_create_multipart_upload_id, parse_list_objects, parse_error,
 31)
 32from .credentials import (
 33    AbstractCredentials, collect_credentials,
 34)
 35from .exceptions import S3Error, ConflictError, PreconditionFailed
 36
 37
 38log = logging.getLogger(__name__)
 39
 40CHUNK_SIZE = 2 ** 16
 41
 42DONE = object()
 43EMPTY_STR_HASH = hashlib.sha256(b"").hexdigest()
 44# 5MB
 45
 46PART_SIZE = 5 * 1024 * 1024
 47HeadersType = t.Union[t.Dict]
 48
 49DataType = t.Optional[t.Mapping[str, t.Any]]
 50RequestContent = t.Optional[t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes]]]
 51PrimitiveData = t.Optional[t.Union[str, int, float, bool]]
 52QueryParamTypes = t.Union[
 53    QueryParams,
 54    t.Mapping[str, t.Union[PrimitiveData, t.Sequence[PrimitiveData]]],
 55    t.List[t.Tuple[str, PrimitiveData]],
 56    t.Tuple[t.Tuple[str, PrimitiveData], ...],
 57    str,
 58    bytes,
 59]
 60
 61
 62@dataclass
 63class HEADERS:
 64    CONTENT_LENGTH = 'Content-Length'
 65    CONTENT_TYPE = 'Content-Type'
 66
 67
 68@run_in_thread
 69def concat_files(
 70    target_file: Path, files: t.List[t.IO[bytes]], buffer_size: int,
 71) -> None:
 72    with target_file.open("ab") as fp:
 73        for file in files:
 74            file.seek(0)
 75            while True:
 76                chunk = file.read(buffer_size)
 77                if not chunk:
 78                    break
 79                fp.write(chunk)
 80            file.close()
 81
 82
 83@run_in_thread
 84def write_from_start(
 85    file: io.BytesIO, chunk: bytes, range_start: int, pos: int,
 86) -> None:
 87    file.seek(pos - range_start)
 88    file.write(chunk)
 89
 90
 91@generate_in_thread
 92def gen_without_hash(
 93    stream: t.Iterable[bytes],
 94) -> t.Generator[t.Tuple[None, bytes], None, None]:
 95    for data in stream:
 96        yield (None, data)
 97
 98
 99@generate_in_thread
100def gen_with_hash(
101    stream: t.Iterable[bytes],
102) -> t.Generator[t.Tuple[str, bytes], None, None]:
103    for data in stream:
104        yield hashlib.sha256(data).hexdigest(), data
105
106
107def file_sender(
108    file_name: str|os.PathLike, chunk_size: int = CHUNK_SIZE,
109) -> t.Iterable[bytes]:
110    with open(file_name, "rb") as fp:
111        while True:
112            data = fp.read(chunk_size)
113            if not data:
114                break
115            yield data
116
117
118async_file_sender = generate_in_thread(file_sender)
119
120MultiPart = tuple
121
122#: Type hint for anything expecting a specific object
123type S3Object = str | URL
124#: Type hint for anything expecting a bucket
125type S3Bucket = str | URL
126
[docs] 127class S3Client: 128 def __init__( 129 self, client: AsyncClient, url: t.Union[URL, str], 130 secret_access_key: t.Optional[str] = None, 131 access_key_id: t.Optional[str] = None, 132 session_token: t.Optional[str] = None, 133 region: str = "", 134 credentials: t.Optional[AbstractCredentials] = None, 135 ): 136 url = URL(url) 137 if credentials is None: 138 credentials = collect_credentials( 139 url=url, 140 access_key_id=access_key_id, 141 region=region, 142 secret_access_key=secret_access_key, 143 session_token=session_token, 144 ) 145 146 if not credentials: 147 raise ValueError( 148 f"Credentials {credentials!r} is incomplete", 149 ) 150 151 self._url = url 152 self._client = client 153 self._credentials = credentials 154 155 def __repr__(self): 156 return f"<{type(self).__name__} {self._url!r} creds={self._credentials!r}>" 157 158 @property 159 def url(self) -> URL: 160 """ 161 S3 Endpoint, see :ref:`urls`. 162 """ 163 return self._url 164 165 def _mkurl(self, path: S3Object|S3Bucket) -> URL: 166 """ 167 Combine a path/s3-url with the endpoint to create the real URL. 168 """ 169 # This method is where host-based buckets would go. 170 if not path: 171 # Referring to the implicit bucket 172 return self._url 173 elif isinstance(path, str): 174 # Classical string-based "<bucket>/<object>" (or whatever) 175 return self._url.join(path) 176 elif isinstance(path, URL): 177 if path.scheme == "s3" or path.scheme.startswith("s3+") or path.scheme.endswith("+s3"): 178 # s3://<bucket>/<object> 179 # s3+<whatever>://<bucket>/<object> 180 # <whatever>+s3://<bucket>/<object> 181 if path.path in ("", "/"): 182 # No object 183 if path.host: 184 return self._url.join(path.host) 185 else: 186 # Implied bucket 187 return self._url 188 elif not path.path.startswith("/"): 189 raise ValueError(f"Relative S3 URLs not supported ({path!r})") 190 elif not path.host: 191 # Implied bucket 192 assert path.path[0] == "/" 193 return self._url.join(path.path[1:]) 194 else: 195 # Bucket and object given 196 return self._url.join(f"{path.host}{path.path}") 197 elif path.scheme in ("http", "https"): 198 # Full URL, I guess 199 return self._url.join(path) 200 else: 201 raise ValueError(f"Don't know how to make a request URL out of {path!r}") 202 else: 203 raise TypeError(f"Don't know how to make a request URL out of {path!r}") 204 205 async def request( 206 self, method: str, path: S3Object|S3Bucket, 207 headers: t.Optional[HeadersType] = None, 208 params: t.Optional[QueryParams] = None, 209 content: t.Optional[RequestContent] = None, 210 content_sha256: t.Optional[str] = None, 211 **kwargs, 212 ) -> Response: 213 """ 214 Make an arbitrary request. 215 216 :meta private: 217 """ 218 headers = self._prepare_headers(headers) 219 220 if content is not None and content_sha256 is None: 221 content_sha256 = UNSIGNED_PAYLOAD 222 223 url = self._mkurl(path) 224 if params: 225 url = url.copy_merge_params(params) 226 227 headers = self._make_headers(headers) 228 headers.update( 229 self._credentials.signer.sign_with_headers( 230 method, str(url), headers=headers, content_hash=content_sha256, 231 ), 232 ) 233 resp = await self._client.request( 234 method, url, headers=headers, content=content, **kwargs, 235 ) 236 try: 237 resp.raise_for_status() 238 except HTTPError as herr: 239 # Annotate the original exception with useful information 240 try: 241 herr.add_note(f"URL: {herr.request.url}") 242 except Exception: 243 pass 244 try: 245 herr.add_note(f"Body:\n{textwrap.indent(resp.text, ' ')}") 246 except Exception: 247 pass 248 249 if resp.headers.get('Content-Type', None) in ('application/xml', 'text/xml') and b'<Error>' in resp.content: 250 # It looks like error XML, wrap it up so it's useful 251 try: 252 err = parse_error(resp.content) 253 except ValueError: 254 # Bad XML, skip the wrapping 255 raise herr 256 try: 257 err.request = herr.request 258 except Exception: 259 pass 260 if hasattr(herr, "response"): 261 try: 262 err.response = herr.response 263 except Exception: 264 pass 265 raise err from herr 266 else: 267 raise 268 else: 269 return resp 270
[docs] 271 async def get(self, object_name: S3Object|S3Bucket, **kwargs) -> Response: 272 """ 273 Get an object. 274 275 Returns: 276 : The object gotten. 277 """ 278 return await self.request("GET", object_name, **kwargs)
279
[docs] 280 async def head( 281 self, object_name: S3Object|S3Bucket, 282 content_sha256=EMPTY_STR_HASH, 283 **kwargs, 284 ) -> Response: 285 """ 286 Get an object's headers. 287 288 Returns: 289 : The object gotten. 290 """ 291 return await self.request( 292 "HEAD", object_name, content_sha256=content_sha256, **kwargs, 293 )
294
[docs] 295 async def delete( 296 self, object_name: S3Object|S3Bucket, 297 content_sha256=EMPTY_STR_HASH, 298 **kwargs, 299 ) -> Response: 300 """ 301 Delete an object. 302 """ 303 return await self.request( 304 "DELETE", object_name, content_sha256=content_sha256, **kwargs, 305 )
306 307 @staticmethod 308 def _make_headers(headers: t.Optional[HeadersType]) -> dict: 309 headers = dict(headers or {}) 310 return headers 311 312 def _prepare_headers( 313 self, headers: t.Optional[HeadersType], 314 file_path: str = "", 315 ) -> dict: 316 headers = self._make_headers(headers) 317 318 if HEADERS.CONTENT_TYPE not in headers: 319 content_type = guess_type(file_path)[0] 320 if content_type is None: 321 content_type = "application/octet-stream" 322 323 headers[HEADERS.CONTENT_TYPE] = content_type 324 325 return headers 326
[docs] 327 async def put( 328 self, object_name: S3Object|S3Bucket, 329 content: RequestContent, 330 **kwargs, 331 ) -> Response: 332 """ 333 Update an object. 334 """ 335 return await self.request("PUT", object_name, content=content, **kwargs)
336
[docs] 337 async def post( 338 self, object_name: S3Object|S3Bucket, 339 content: RequestContent = None, 340 **kwargs, 341 ) -> Response: 342 return await self.request("POST", object_name, content=content, **kwargs)
343
[docs] 344 async def put_file( 345 self, object_name: S3Object, 346 file_path: str|os.PathLike, 347 *, headers: t.Optional[HeadersType] = None, 348 chunk_size: int = CHUNK_SIZE, content_sha256: t.Optional[str] = None, 349 ) -> Response: 350 """ 351 Update an object from a file (by path) 352 """ 353 headers = self._prepare_headers(headers, str(file_path)) 354 return await self.put( 355 object_name, 356 headers=headers, 357 content=async_file_sender( 358 file_path, 359 chunk_size=chunk_size, 360 ), 361 content_sha256=content_sha256, 362 )
363 364 @asyncbackoff( 365 None, None, 0, 366 max_tries=3, exceptions=(HTTPError,), 367 ) 368 async def _create_multipart_upload( 369 self, 370 object_name: S3Object, 371 headers: t.Optional[HeadersType] = None, 372 ) -> str: 373 resp = await self.post( 374 object_name, 375 headers=headers, 376 params={"uploads": 1}, 377 content_sha256=EMPTY_STR_HASH, 378 ) 379 payload = resp.read() 380 return parse_create_multipart_upload_id(payload) 381 382 @asyncbackoff( 383 None, None, 0, 384 max_tries=3, exceptions=(S3Error, HTTPError), 385 ) 386 async def _complete_multipart_upload( 387 self, 388 upload_id: str, 389 object_name: S3Object, 390 parts: t.List[t.Tuple[int, str]], 391 ) -> None: 392 complete_upload_request = create_complete_upload_request(parts) 393 resp = await self.post( 394 object_name, 395 headers={"Content-Type": "text/xml"}, 396 params={"uploadId": upload_id}, 397 content=complete_upload_request, 398 content_sha256=hashlib.sha256(complete_upload_request).hexdigest(), 399 ) 400 401 @asyncbackoff( 402 None, None, 0, 403 max_tries=3, exceptions=(S3Error, HTTPError), 404 ) 405 async def _abort_multipart_upload( 406 self, 407 upload_id: str, 408 object_name: S3Object, 409 ) -> None: 410 resp = await self.delete( 411 object_name, 412 params={"uploadId": upload_id}, 413 content_sha256=EMPTY_STR_HASH, 414 ) 415 416 async def _put_part( 417 self, 418 upload_id: str, 419 object_name: S3Object, 420 part_no: int, 421 content: RequestContent, 422 content_sha256: str, 423 **kwargs, 424 ) -> str: 425 resp = await self.put( 426 object_name, 427 params={"partNumber": part_no, "uploadId": upload_id}, 428 content=content, 429 content_sha256=content_sha256, 430 **kwargs, 431 ) 432 return resp.headers["Etag"].strip('"') 433 434 async def _part_uploader( 435 self, 436 upload_id: str, 437 object_name: S3Object, 438 parts_stream: anyio.streams.memory.MemoryObjectReceiveStream[MultiPart], 439 results_queue: deque, 440 part_upload_tries: int, 441 **kwargs, 442 ) -> None: 443 backoff = asyncbackoff( 444 None, None, 445 max_tries=part_upload_tries, 446 exceptions=(HTTPError,), 447 ) 448 async for part_no, part_hash, part in parts_stream: 449 etag = await backoff(self._put_part)( 450 upload_id=upload_id, 451 object_name=object_name, 452 part_no=part_no, 453 content=part, 454 content_sha256=part_hash, 455 **kwargs, 456 ) 457 log.debug( 458 "Etag for part %d of %s is %s", part_no, upload_id, etag, 459 ) 460 results_queue.append((part_no, etag)) 461
[docs] 462 async def put_file_multipart( 463 self, 464 object_name: S3Object, 465 file_path: str|os.PathLike, 466 *, 467 headers: t.Optional[HeadersType] = None, 468 part_size: int = PART_SIZE, 469 workers_count: int = 1, 470 max_size: t.Optional[int] = None, 471 part_upload_tries: int = 3, 472 calculate_content_sha256: bool = True, 473 **kwargs, 474 ) -> None: 475 """ 476 Upload data from a file with multipart upload 477 478 Args: 479 object_name: key in s3 480 file_path: path to a file for upload 481 headers: additional headers, such as Content-Type 482 part_size: size of a chunk to send (recommended: >5Mb) 483 workers_count: count of coroutines for asyncronous parts uploading 484 max_size: maximum size of a queue with data to send (should be 485 at least ``workers_count``) 486 part_upload_tries: how many times trying to put part to s3 before fail 487 calculate_content_sha256: whether to calculate sha256 hash of a part 488 for integrity purposes 489 """ 490 log.debug( 491 "Going to multipart upload %s to %s with part size %d", 492 file_path, object_name, part_size, 493 ) 494 await self.put_multipart( 495 object_name, 496 file_sender( 497 file_path, 498 chunk_size=part_size, 499 ), 500 headers=headers, 501 workers_count=workers_count, 502 max_size=max_size, 503 part_upload_tries=part_upload_tries, 504 calculate_content_sha256=calculate_content_sha256, 505 **kwargs, 506 )
507 508 async def _parts_generator( 509 self, gen: t.AsyncIterable[tuple], workers_count: int, parts_stream: anyio.streams.memory.MemoryObjectSendStream[MultiPart], 510 ) -> int: 511 part_no = 1 512 async with parts_stream: 513 async for part_hash, part in gen: 514 log.debug( 515 "Reading part %d (%d bytes)", part_no, len(part), 516 ) 517 await parts_stream.send((part_no, part_hash, part)) 518 part_no += 1 519 520 return part_no 521
[docs] 522 async def put_multipart( 523 self, 524 object_name: S3Object, 525 content: t.Iterable[bytes], 526 *, 527 headers: t.Optional[HeadersType] = None, 528 workers_count: int = 1, 529 max_size: t.Optional[int] = None, 530 part_upload_tries: int = 3, 531 calculate_content_sha256: bool = True, 532 **kwargs, 533 ) -> None: 534 """ 535 Send data from iterable with multipart upload 536 537 Args: 538 object_name: key in s3 539 data: any iterable that returns chunks of bytes 540 headers: additional headers, such as Content-Type 541 workers_count: count of coroutines for asyncronous parts uploading 542 max_size: maximum size of a queue with data to send (should be 543 at least ``workers_count``) 544 part_upload_tries: how many times trying to put part to s3 before fail 545 calculate_content_sha256: whether to calculate sha256 hash of a part 546 for integrity purposes 547 """ 548 if workers_count < 1: 549 raise ValueError( 550 f"Workers count should be > 0. Got {workers_count}", 551 ) 552 max_size = max_size or workers_count 553 554 upload_id = await self._create_multipart_upload( 555 object_name, 556 headers=headers, 557 ) 558 log.debug("Got upload id %s for %s", upload_id, object_name) 559 560 results_queue: deque = deque() 561 try: 562 async with anyio.create_task_group() as tg: 563 send_stream, receive_stream = anyio.create_memory_object_stream() 564 for wid in range(workers_count): 565 tg.start_soon(partial( 566 self._part_uploader, 567 upload_id, 568 object_name, 569 receive_stream.clone(), 570 results_queue, 571 part_upload_tries, 572 **kwargs, 573 ), name=f"put-worker-{upload_id}@{wid}") 574 # Get rid of our copy 575 receive_stream.close() 576 del receive_stream 577 578 if calculate_content_sha256: 579 gen = gen_with_hash(content) 580 else: 581 gen = gen_without_hash(content) 582 583 part_no = await self._parts_generator(gen, workers_count, send_stream) 584 except* Exception as excgroup: 585 for exc in excgroup.exceptions: 586 raise exc from None 587 588 log.debug( 589 "All parts (#%d) of %s are uploaded to %s", 590 part_no - 1, upload_id, object_name, 591 ) 592 593 # Parts should be in ascending order 594 parts = sorted(results_queue, key=lambda x: x[0]) 595 if parts: 596 await self._complete_multipart_upload( 597 upload_id, object_name, parts, 598 ) 599 else: 600 await self._abort_multipart_upload(upload_id, object_name) 601 await self.put(object_name, b"", **kwargs)
602 603 async def _download_range( 604 self, 605 object_name: S3Object, 606 writer: ChunkSendStream, 607 *, 608 etag: str, 609 pos: int, 610 range_start: int, 611 req_range_start: int, 612 req_range_end: int, 613 buffer_size: int, 614 headers: t.Optional[HeadersType] = None, 615 **kwargs, 616 ) -> None: 617 """ 618 Downloading range [req_range_start:req_range_end] to `file` 619 """ 620 log.debug( 621 "Downloading %s from %d to %d", 622 object_name, 623 req_range_start, 624 req_range_end, 625 ) 626 if not headers: 627 headers = {} 628 headers = headers.copy() 629 headers["Range"] = f"bytes={req_range_start}-{req_range_end}" 630 headers["If-Match"] = etag 631 632 resp = await self.get(object_name, headers=headers, **kwargs) 633 if resp.status_code not in (HTTPStatus.PARTIAL_CONTENT, HTTPStatus.OK): 634 raise HTTPStatusError( 635 f"Got wrong status code {resp.status_code} on range download " 636 f"of {object_name}", 637 request=resp.request, response=resp 638 ) 639 assert 'Content-Range' in resp.headers 640 assert resp.headers['Content-Range'].startswith(f"bytes {req_range_start}-{req_range_end}/") 641 # FIXME: Handle OK 642 # FIXME: Handle Content-Range being different from requested 643 pos = req_range_start 644 async for chunk in resp.aiter_bytes(buffer_size): 645 if not chunk: 646 break 647 await writer.send((pos, chunk)) 648 pos += len(chunk) 649 650 async def _download_worker( 651 self, 652 object_name: S3Object, 653 writer: ChunkSendStream, 654 *, 655 etag: str, 656 range_step: int, 657 range_start: int, 658 range_end: int, 659 buffer_size: int, 660 range_get_tries: int = 3, 661 headers: t.Optional[HeadersType] = None, 662 **kwargs, 663 ) -> None: 664 """ 665 Downloads data in range `[range_start, range_end)` 666 with step `range_step` to file `file_path`. 667 Uses `etag` to make sure that file wasn't changed in the process. 668 """ 669 log.debug( 670 "Starting download worker for range [%d:%d]", 671 range_start, 672 range_end, 673 ) 674 async with writer: 675 backoff = asyncbackoff( 676 None, None, 677 max_tries=range_get_tries, 678 exceptions=(HTTPError,), 679 ) 680 req_range_end = range_start 681 for req_range_start in range(range_start, range_end, range_step): 682 req_range_end += range_step 683 if req_range_end > range_end: 684 req_range_end = range_end 685 await backoff(self._download_range)( 686 object_name, 687 writer, 688 etag=etag, 689 pos=(req_range_start - range_start), 690 range_start=range_start, 691 req_range_start=req_range_start, 692 req_range_end=req_range_end - 1, 693 buffer_size=buffer_size, 694 headers=headers, 695 **kwargs, 696 ) 697
[docs] 698 async def get_file_parallel( 699 self, 700 object_name: S3Object, 701 file_path: str|os.PathLike, 702 *, 703 headers: t.Optional[HeadersType] = None, 704 range_step: int = PART_SIZE, 705 workers_count: int = 1, 706 range_get_tries: int = 3, 707 buffer_size: int = PAGESIZE * 32, 708 **kwargs, 709 ) -> None: 710 """ 711 Download object in parallel with requests with Range. If file changed 712 while download is in progress, an error will be raised. 713 714 Args: 715 object_name: s3 key to download 716 file_path: target file path 717 headers: additional headers 718 range_step: how much data will be downloaded in single HTTP request 719 workers_count: count of parallel workers 720 range_get_tries: count of tries to download each range 721 buffer_size: size of a buffer for on the fly data 722 """ 723 file_path = anyio.Path(file_path) 724 resp = await self.head(object_name, headers=headers) 725 726 etag = resp.headers["Etag"] 727 file_size = int(resp.headers["Content-Length"]) 728 log.debug( 729 "Object's %s etag is %s and size is %d", 730 object_name, 731 etag, 732 file_size, 733 ) 734 735 if file_size == 0: 736 await file_path.touch() 737 return 738 739 worker_range_size = file_size // workers_count 740 range_end = 0 741 try: 742 try: 743 async with ( 744 await anyio.open_file(file_path, "w+b") as fp, 745 parallel_file_writer(fp) as pfw, 746 anyio.create_task_group() as tg, 747 ): 748 for range_start in range(0, file_size, worker_range_size): 749 range_end += worker_range_size 750 if range_end > file_size: 751 range_end = file_size 752 tg.start_soon(partial( 753 self._download_worker, 754 object_name, 755 await pfw.get_block(range_start, range_end), 756 buffer_size=buffer_size, 757 etag=etag, 758 headers=headers, 759 range_end=range_end, 760 range_get_tries=range_get_tries, 761 range_start=range_start, 762 range_step=range_step, 763 **kwargs, 764 ), name=f"get-worker@{range_start}") 765 except* PreconditionFailed as excgroup: 766 raise ConflictError(f"Object {object_name!r} changed while downloading") from excgroup 767 except* Exception as excgroup: 768 # Unwrap and raise just one 769 for exc in excgroup.exceptions: 770 raise exc from None 771 772 except Exception: 773 log.exception( 774 "Error on file download. Removing possibly incomplete file %s", 775 file_path, 776 ) 777 with suppress(FileNotFoundError): 778 await file_path.unlink() 779 raise
780
[docs] 781 async def list_objects_v2( 782 self, 783 object_name: str|URL|None = None, 784 *, 785 bucket: S3Bucket|None = None, 786 prefix: t.Optional[t.Union[str, Path]] = None, 787 delimiter: t.Optional[str] = None, 788 max_keys: t.Optional[int] = None, 789 start_after: t.Optional[str] = None, 790 ) -> t.AsyncIterator[t.List[AwsObjectMeta]]: 791 """ 792 List objects in bucket. 793 794 Args: 795 object_name: 796 path to listing endpoint, defaults to ``'/'``; a ``bucket`` value is 797 prepended to this value if provided. 798 prefix: 799 limits the response to keys that begin with the specified 800 prefix 801 delimiter: a delimiter is a character you use to group keys 802 max_keys: maximum number of keys returned in the response 803 start_after: keys to start listing after 804 805 Returns: 806 : An iterator over lists of metadata objects, each corresponding 807 to an individual response result (typically limited to 1000 keys). 808 809 .. note:: 810 811 Despite the name :meth:`list_objects` is newer and probably more ergonomic. 812 """ 813 814 params = { 815 "list-type": "2", 816 } 817 818 if prefix: 819 params["prefix"] = str(prefix) 820 821 if delimiter: 822 params["delimiter"] = delimiter 823 824 if max_keys: 825 params["max-keys"] = str(max_keys) 826 827 if start_after: 828 params["start-after"] = start_after 829 830 if bucket and object_name: 831 raise ValueError("Must give either bucket or object_name") 832 833 while True: 834 resp = await self.get(bucket or object_name or '', params=params, headers={'Accept': 'application/xml'}) 835 payload = resp.content 836 metadata, continuation_token = parse_list_objects(payload) 837 if not metadata: 838 break 839 yield metadata 840 if not continuation_token: 841 break 842 params["continuation-token"] = continuation_token
843
[docs] 844 async def list_objects( 845 self, 846 bucket: S3Bucket|None=None, 847 *, 848 prefix: str|None = None, 849 delimiter: t.Optional[str] = None, 850 max_keys: t.Optional[int] = None, 851 start_after: t.Optional[str] = None, 852 ) -> t.AsyncIterator[AwsObjectMeta]: 853 """ 854 List objects in bucket. 855 856 Args: 857 bucket: 858 Name of the bucket to list, or :data:`None` if the bucket is in the endpoint. 859 prefix: 860 limits the response to keys that begin with the specified 861 prefix 862 delimiter: a delimiter is a character you use to group keys 863 max_keys: maximum number of keys returned in the response 864 start_after: keys to start listing after 865 866 Returns: 867 : Iterator over metadata objects 868 """ 869 params = { 870 "list-type": "2", 871 } 872 873 if prefix: 874 params["prefix"] = str(prefix) 875 876 if delimiter: 877 params["delimiter"] = delimiter 878 879 if max_keys: 880 params["max-keys"] = str(max_keys) 881 882 if start_after: 883 params["start-after"] = start_after 884 885 while True: 886 resp = await self.get(bucket or '', params=params, headers={'Accept': 'application/xml'}) 887 payload = resp.content 888 metadata, continuation_token = parse_list_objects(payload) 889 if not metadata: 890 break 891 for objmeta in metadata: 892 yield objmeta 893 if not continuation_token: 894 break 895 params["continuation-token"] = continuation_token