1# AWS S3 API Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/Type_API_Reference.html
2# Probably useful to anyone working on this module
3
4import hashlib
5import io
6import logging
7import os
8import typing as t
9from collections import deque
10from contextlib import suppress
11from dataclasses import dataclass
12from functools import partial
13from http import HTTPStatus
14from itertools import chain
15from mimetypes import guess_type
16from mmap import PAGESIZE
17from pathlib import Path
18from tempfile import NamedTemporaryFile
19import textwrap
20
21import anyio
22import anyio.streams.memory
23from anyiomisc import asyncbackoff
24from aws_request_signer import UNSIGNED_PAYLOAD
25from httpx import URL, HTTPError, AsyncClient, Response, QueryParams, HTTPStatusError
26
27from ._async import generate_in_thread, run_in_thread, parallel_file_writer, ChunkSendStream
28from ._xml import (
29 AwsObjectMeta, create_complete_upload_request,
30 parse_create_multipart_upload_id, parse_list_objects, parse_error,
31)
32from .credentials import (
33 AbstractCredentials, collect_credentials,
34)
35from .exceptions import S3Error, ConflictError, PreconditionFailed
36
37
38log = logging.getLogger(__name__)
39
40CHUNK_SIZE = 2 ** 16
41
42DONE = object()
43EMPTY_STR_HASH = hashlib.sha256(b"").hexdigest()
44# 5MB
45
46PART_SIZE = 5 * 1024 * 1024
47HeadersType = t.Union[t.Dict]
48
49DataType = t.Optional[t.Mapping[str, t.Any]]
50RequestContent = t.Optional[t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes]]]
51PrimitiveData = t.Optional[t.Union[str, int, float, bool]]
52QueryParamTypes = t.Union[
53 QueryParams,
54 t.Mapping[str, t.Union[PrimitiveData, t.Sequence[PrimitiveData]]],
55 t.List[t.Tuple[str, PrimitiveData]],
56 t.Tuple[t.Tuple[str, PrimitiveData], ...],
57 str,
58 bytes,
59]
60
61
62@dataclass
63class HEADERS:
64 CONTENT_LENGTH = 'Content-Length'
65 CONTENT_TYPE = 'Content-Type'
66
67
68@run_in_thread
69def concat_files(
70 target_file: Path, files: t.List[t.IO[bytes]], buffer_size: int,
71) -> None:
72 with target_file.open("ab") as fp:
73 for file in files:
74 file.seek(0)
75 while True:
76 chunk = file.read(buffer_size)
77 if not chunk:
78 break
79 fp.write(chunk)
80 file.close()
81
82
83@run_in_thread
84def write_from_start(
85 file: io.BytesIO, chunk: bytes, range_start: int, pos: int,
86) -> None:
87 file.seek(pos - range_start)
88 file.write(chunk)
89
90
91@generate_in_thread
92def gen_without_hash(
93 stream: t.Iterable[bytes],
94) -> t.Generator[t.Tuple[None, bytes], None, None]:
95 for data in stream:
96 yield (None, data)
97
98
99@generate_in_thread
100def gen_with_hash(
101 stream: t.Iterable[bytes],
102) -> t.Generator[t.Tuple[str, bytes], None, None]:
103 for data in stream:
104 yield hashlib.sha256(data).hexdigest(), data
105
106
107def file_sender(
108 file_name: str|os.PathLike, chunk_size: int = CHUNK_SIZE,
109) -> t.Iterable[bytes]:
110 with open(file_name, "rb") as fp:
111 while True:
112 data = fp.read(chunk_size)
113 if not data:
114 break
115 yield data
116
117
118async_file_sender = generate_in_thread(file_sender)
119
120MultiPart = tuple
121
122#: Type hint for anything expecting a specific object
123type S3Object = str | URL
124#: Type hint for anything expecting a bucket
125type S3Bucket = str | URL
126
[docs]
127class S3Client:
128 def __init__(
129 self, client: AsyncClient, url: t.Union[URL, str],
130 secret_access_key: t.Optional[str] = None,
131 access_key_id: t.Optional[str] = None,
132 session_token: t.Optional[str] = None,
133 region: str = "",
134 credentials: t.Optional[AbstractCredentials] = None,
135 ):
136 url = URL(url)
137 if credentials is None:
138 credentials = collect_credentials(
139 url=url,
140 access_key_id=access_key_id,
141 region=region,
142 secret_access_key=secret_access_key,
143 session_token=session_token,
144 )
145
146 if not credentials:
147 raise ValueError(
148 f"Credentials {credentials!r} is incomplete",
149 )
150
151 self._url = url
152 self._client = client
153 self._credentials = credentials
154
155 def __repr__(self):
156 return f"<{type(self).__name__} {self._url!r} creds={self._credentials!r}>"
157
158 @property
159 def url(self) -> URL:
160 """
161 S3 Endpoint, see :ref:`urls`.
162 """
163 return self._url
164
165 def _mkurl(self, path: S3Object|S3Bucket) -> URL:
166 """
167 Combine a path/s3-url with the endpoint to create the real URL.
168 """
169 # This method is where host-based buckets would go.
170 if not path:
171 # Referring to the implicit bucket
172 return self._url
173 elif isinstance(path, str):
174 # Classical string-based "<bucket>/<object>" (or whatever)
175 return self._url.join(path)
176 elif isinstance(path, URL):
177 if path.scheme == "s3" or path.scheme.startswith("s3+") or path.scheme.endswith("+s3"):
178 # s3://<bucket>/<object>
179 # s3+<whatever>://<bucket>/<object>
180 # <whatever>+s3://<bucket>/<object>
181 if path.path in ("", "/"):
182 # No object
183 if path.host:
184 return self._url.join(path.host)
185 else:
186 # Implied bucket
187 return self._url
188 elif not path.path.startswith("/"):
189 raise ValueError(f"Relative S3 URLs not supported ({path!r})")
190 elif not path.host:
191 # Implied bucket
192 assert path.path[0] == "/"
193 return self._url.join(path.path[1:])
194 else:
195 # Bucket and object given
196 return self._url.join(f"{path.host}{path.path}")
197 elif path.scheme in ("http", "https"):
198 # Full URL, I guess
199 return self._url.join(path)
200 else:
201 raise ValueError(f"Don't know how to make a request URL out of {path!r}")
202 else:
203 raise TypeError(f"Don't know how to make a request URL out of {path!r}")
204
205 async def request(
206 self, method: str, path: S3Object|S3Bucket,
207 headers: t.Optional[HeadersType] = None,
208 params: t.Optional[QueryParams] = None,
209 content: t.Optional[RequestContent] = None,
210 content_sha256: t.Optional[str] = None,
211 **kwargs,
212 ) -> Response:
213 """
214 Make an arbitrary request.
215
216 :meta private:
217 """
218 headers = self._prepare_headers(headers)
219
220 if content is not None and content_sha256 is None:
221 content_sha256 = UNSIGNED_PAYLOAD
222
223 url = self._mkurl(path)
224 if params:
225 url = url.copy_merge_params(params)
226
227 headers = self._make_headers(headers)
228 headers.update(
229 self._credentials.signer.sign_with_headers(
230 method, str(url), headers=headers, content_hash=content_sha256,
231 ),
232 )
233 resp = await self._client.request(
234 method, url, headers=headers, content=content, **kwargs,
235 )
236 try:
237 resp.raise_for_status()
238 except HTTPError as herr:
239 # Annotate the original exception with useful information
240 try:
241 herr.add_note(f"URL: {herr.request.url}")
242 except Exception:
243 pass
244 try:
245 herr.add_note(f"Body:\n{textwrap.indent(resp.text, ' ')}")
246 except Exception:
247 pass
248
249 if resp.headers.get('Content-Type', None) in ('application/xml', 'text/xml') and b'<Error>' in resp.content:
250 # It looks like error XML, wrap it up so it's useful
251 try:
252 err = parse_error(resp.content)
253 except ValueError:
254 # Bad XML, skip the wrapping
255 raise herr
256 try:
257 err.request = herr.request
258 except Exception:
259 pass
260 if hasattr(herr, "response"):
261 try:
262 err.response = herr.response
263 except Exception:
264 pass
265 raise err from herr
266 else:
267 raise
268 else:
269 return resp
270
[docs]
271 async def get(self, object_name: S3Object|S3Bucket, **kwargs) -> Response:
272 """
273 Get an object.
274
275 Returns:
276 : The object gotten.
277 """
278 return await self.request("GET", object_name, **kwargs)
279
[docs]
280 async def head(
281 self, object_name: S3Object|S3Bucket,
282 content_sha256=EMPTY_STR_HASH,
283 **kwargs,
284 ) -> Response:
285 """
286 Get an object's headers.
287
288 Returns:
289 : The object gotten.
290 """
291 return await self.request(
292 "HEAD", object_name, content_sha256=content_sha256, **kwargs,
293 )
294
[docs]
295 async def delete(
296 self, object_name: S3Object|S3Bucket,
297 content_sha256=EMPTY_STR_HASH,
298 **kwargs,
299 ) -> Response:
300 """
301 Delete an object.
302 """
303 return await self.request(
304 "DELETE", object_name, content_sha256=content_sha256, **kwargs,
305 )
306
307 @staticmethod
308 def _make_headers(headers: t.Optional[HeadersType]) -> dict:
309 headers = dict(headers or {})
310 return headers
311
312 def _prepare_headers(
313 self, headers: t.Optional[HeadersType],
314 file_path: str = "",
315 ) -> dict:
316 headers = self._make_headers(headers)
317
318 if HEADERS.CONTENT_TYPE not in headers:
319 content_type = guess_type(file_path)[0]
320 if content_type is None:
321 content_type = "application/octet-stream"
322
323 headers[HEADERS.CONTENT_TYPE] = content_type
324
325 return headers
326
[docs]
327 async def put(
328 self, object_name: S3Object|S3Bucket,
329 content: RequestContent,
330 **kwargs,
331 ) -> Response:
332 """
333 Update an object.
334 """
335 return await self.request("PUT", object_name, content=content, **kwargs)
336
[docs]
337 async def post(
338 self, object_name: S3Object|S3Bucket,
339 content: RequestContent = None,
340 **kwargs,
341 ) -> Response:
342 return await self.request("POST", object_name, content=content, **kwargs)
343
[docs]
344 async def put_file(
345 self, object_name: S3Object,
346 file_path: str|os.PathLike,
347 *, headers: t.Optional[HeadersType] = None,
348 chunk_size: int = CHUNK_SIZE, content_sha256: t.Optional[str] = None,
349 ) -> Response:
350 """
351 Update an object from a file (by path)
352 """
353 headers = self._prepare_headers(headers, str(file_path))
354 return await self.put(
355 object_name,
356 headers=headers,
357 content=async_file_sender(
358 file_path,
359 chunk_size=chunk_size,
360 ),
361 content_sha256=content_sha256,
362 )
363
364 @asyncbackoff(
365 None, None, 0,
366 max_tries=3, exceptions=(HTTPError,),
367 )
368 async def _create_multipart_upload(
369 self,
370 object_name: S3Object,
371 headers: t.Optional[HeadersType] = None,
372 ) -> str:
373 resp = await self.post(
374 object_name,
375 headers=headers,
376 params={"uploads": 1},
377 content_sha256=EMPTY_STR_HASH,
378 )
379 payload = resp.read()
380 return parse_create_multipart_upload_id(payload)
381
382 @asyncbackoff(
383 None, None, 0,
384 max_tries=3, exceptions=(S3Error, HTTPError),
385 )
386 async def _complete_multipart_upload(
387 self,
388 upload_id: str,
389 object_name: S3Object,
390 parts: t.List[t.Tuple[int, str]],
391 ) -> None:
392 complete_upload_request = create_complete_upload_request(parts)
393 resp = await self.post(
394 object_name,
395 headers={"Content-Type": "text/xml"},
396 params={"uploadId": upload_id},
397 content=complete_upload_request,
398 content_sha256=hashlib.sha256(complete_upload_request).hexdigest(),
399 )
400
401 @asyncbackoff(
402 None, None, 0,
403 max_tries=3, exceptions=(S3Error, HTTPError),
404 )
405 async def _abort_multipart_upload(
406 self,
407 upload_id: str,
408 object_name: S3Object,
409 ) -> None:
410 resp = await self.delete(
411 object_name,
412 params={"uploadId": upload_id},
413 content_sha256=EMPTY_STR_HASH,
414 )
415
416 async def _put_part(
417 self,
418 upload_id: str,
419 object_name: S3Object,
420 part_no: int,
421 content: RequestContent,
422 content_sha256: str,
423 **kwargs,
424 ) -> str:
425 resp = await self.put(
426 object_name,
427 params={"partNumber": part_no, "uploadId": upload_id},
428 content=content,
429 content_sha256=content_sha256,
430 **kwargs,
431 )
432 return resp.headers["Etag"].strip('"')
433
434 async def _part_uploader(
435 self,
436 upload_id: str,
437 object_name: S3Object,
438 parts_stream: anyio.streams.memory.MemoryObjectReceiveStream[MultiPart],
439 results_queue: deque,
440 part_upload_tries: int,
441 **kwargs,
442 ) -> None:
443 backoff = asyncbackoff(
444 None, None,
445 max_tries=part_upload_tries,
446 exceptions=(HTTPError,),
447 )
448 async for part_no, part_hash, part in parts_stream:
449 etag = await backoff(self._put_part)(
450 upload_id=upload_id,
451 object_name=object_name,
452 part_no=part_no,
453 content=part,
454 content_sha256=part_hash,
455 **kwargs,
456 )
457 log.debug(
458 "Etag for part %d of %s is %s", part_no, upload_id, etag,
459 )
460 results_queue.append((part_no, etag))
461
[docs]
462 async def put_file_multipart(
463 self,
464 object_name: S3Object,
465 file_path: str|os.PathLike,
466 *,
467 headers: t.Optional[HeadersType] = None,
468 part_size: int = PART_SIZE,
469 workers_count: int = 1,
470 max_size: t.Optional[int] = None,
471 part_upload_tries: int = 3,
472 calculate_content_sha256: bool = True,
473 **kwargs,
474 ) -> None:
475 """
476 Upload data from a file with multipart upload
477
478 Args:
479 object_name: key in s3
480 file_path: path to a file for upload
481 headers: additional headers, such as Content-Type
482 part_size: size of a chunk to send (recommended: >5Mb)
483 workers_count: count of coroutines for asyncronous parts uploading
484 max_size: maximum size of a queue with data to send (should be
485 at least ``workers_count``)
486 part_upload_tries: how many times trying to put part to s3 before fail
487 calculate_content_sha256: whether to calculate sha256 hash of a part
488 for integrity purposes
489 """
490 log.debug(
491 "Going to multipart upload %s to %s with part size %d",
492 file_path, object_name, part_size,
493 )
494 await self.put_multipart(
495 object_name,
496 file_sender(
497 file_path,
498 chunk_size=part_size,
499 ),
500 headers=headers,
501 workers_count=workers_count,
502 max_size=max_size,
503 part_upload_tries=part_upload_tries,
504 calculate_content_sha256=calculate_content_sha256,
505 **kwargs,
506 )
507
508 async def _parts_generator(
509 self, gen: t.AsyncIterable[tuple], workers_count: int, parts_stream: anyio.streams.memory.MemoryObjectSendStream[MultiPart],
510 ) -> int:
511 part_no = 1
512 async with parts_stream:
513 async for part_hash, part in gen:
514 log.debug(
515 "Reading part %d (%d bytes)", part_no, len(part),
516 )
517 await parts_stream.send((part_no, part_hash, part))
518 part_no += 1
519
520 return part_no
521
[docs]
522 async def put_multipart(
523 self,
524 object_name: S3Object,
525 content: t.Iterable[bytes],
526 *,
527 headers: t.Optional[HeadersType] = None,
528 workers_count: int = 1,
529 max_size: t.Optional[int] = None,
530 part_upload_tries: int = 3,
531 calculate_content_sha256: bool = True,
532 **kwargs,
533 ) -> None:
534 """
535 Send data from iterable with multipart upload
536
537 Args:
538 object_name: key in s3
539 data: any iterable that returns chunks of bytes
540 headers: additional headers, such as Content-Type
541 workers_count: count of coroutines for asyncronous parts uploading
542 max_size: maximum size of a queue with data to send (should be
543 at least ``workers_count``)
544 part_upload_tries: how many times trying to put part to s3 before fail
545 calculate_content_sha256: whether to calculate sha256 hash of a part
546 for integrity purposes
547 """
548 if workers_count < 1:
549 raise ValueError(
550 f"Workers count should be > 0. Got {workers_count}",
551 )
552 max_size = max_size or workers_count
553
554 upload_id = await self._create_multipart_upload(
555 object_name,
556 headers=headers,
557 )
558 log.debug("Got upload id %s for %s", upload_id, object_name)
559
560 results_queue: deque = deque()
561 try:
562 async with anyio.create_task_group() as tg:
563 send_stream, receive_stream = anyio.create_memory_object_stream()
564 for wid in range(workers_count):
565 tg.start_soon(partial(
566 self._part_uploader,
567 upload_id,
568 object_name,
569 receive_stream.clone(),
570 results_queue,
571 part_upload_tries,
572 **kwargs,
573 ), name=f"put-worker-{upload_id}@{wid}")
574 # Get rid of our copy
575 receive_stream.close()
576 del receive_stream
577
578 if calculate_content_sha256:
579 gen = gen_with_hash(content)
580 else:
581 gen = gen_without_hash(content)
582
583 part_no = await self._parts_generator(gen, workers_count, send_stream)
584 except* Exception as excgroup:
585 for exc in excgroup.exceptions:
586 raise exc from None
587
588 log.debug(
589 "All parts (#%d) of %s are uploaded to %s",
590 part_no - 1, upload_id, object_name,
591 )
592
593 # Parts should be in ascending order
594 parts = sorted(results_queue, key=lambda x: x[0])
595 if parts:
596 await self._complete_multipart_upload(
597 upload_id, object_name, parts,
598 )
599 else:
600 await self._abort_multipart_upload(upload_id, object_name)
601 await self.put(object_name, b"", **kwargs)
602
603 async def _download_range(
604 self,
605 object_name: S3Object,
606 writer: ChunkSendStream,
607 *,
608 etag: str,
609 pos: int,
610 range_start: int,
611 req_range_start: int,
612 req_range_end: int,
613 buffer_size: int,
614 headers: t.Optional[HeadersType] = None,
615 **kwargs,
616 ) -> None:
617 """
618 Downloading range [req_range_start:req_range_end] to `file`
619 """
620 log.debug(
621 "Downloading %s from %d to %d",
622 object_name,
623 req_range_start,
624 req_range_end,
625 )
626 if not headers:
627 headers = {}
628 headers = headers.copy()
629 headers["Range"] = f"bytes={req_range_start}-{req_range_end}"
630 headers["If-Match"] = etag
631
632 resp = await self.get(object_name, headers=headers, **kwargs)
633 if resp.status_code not in (HTTPStatus.PARTIAL_CONTENT, HTTPStatus.OK):
634 raise HTTPStatusError(
635 f"Got wrong status code {resp.status_code} on range download "
636 f"of {object_name}",
637 request=resp.request, response=resp
638 )
639 assert 'Content-Range' in resp.headers
640 assert resp.headers['Content-Range'].startswith(f"bytes {req_range_start}-{req_range_end}/")
641 # FIXME: Handle OK
642 # FIXME: Handle Content-Range being different from requested
643 pos = req_range_start
644 async for chunk in resp.aiter_bytes(buffer_size):
645 if not chunk:
646 break
647 await writer.send((pos, chunk))
648 pos += len(chunk)
649
650 async def _download_worker(
651 self,
652 object_name: S3Object,
653 writer: ChunkSendStream,
654 *,
655 etag: str,
656 range_step: int,
657 range_start: int,
658 range_end: int,
659 buffer_size: int,
660 range_get_tries: int = 3,
661 headers: t.Optional[HeadersType] = None,
662 **kwargs,
663 ) -> None:
664 """
665 Downloads data in range `[range_start, range_end)`
666 with step `range_step` to file `file_path`.
667 Uses `etag` to make sure that file wasn't changed in the process.
668 """
669 log.debug(
670 "Starting download worker for range [%d:%d]",
671 range_start,
672 range_end,
673 )
674 async with writer:
675 backoff = asyncbackoff(
676 None, None,
677 max_tries=range_get_tries,
678 exceptions=(HTTPError,),
679 )
680 req_range_end = range_start
681 for req_range_start in range(range_start, range_end, range_step):
682 req_range_end += range_step
683 if req_range_end > range_end:
684 req_range_end = range_end
685 await backoff(self._download_range)(
686 object_name,
687 writer,
688 etag=etag,
689 pos=(req_range_start - range_start),
690 range_start=range_start,
691 req_range_start=req_range_start,
692 req_range_end=req_range_end - 1,
693 buffer_size=buffer_size,
694 headers=headers,
695 **kwargs,
696 )
697
[docs]
698 async def get_file_parallel(
699 self,
700 object_name: S3Object,
701 file_path: str|os.PathLike,
702 *,
703 headers: t.Optional[HeadersType] = None,
704 range_step: int = PART_SIZE,
705 workers_count: int = 1,
706 range_get_tries: int = 3,
707 buffer_size: int = PAGESIZE * 32,
708 **kwargs,
709 ) -> None:
710 """
711 Download object in parallel with requests with Range. If file changed
712 while download is in progress, an error will be raised.
713
714 Args:
715 object_name: s3 key to download
716 file_path: target file path
717 headers: additional headers
718 range_step: how much data will be downloaded in single HTTP request
719 workers_count: count of parallel workers
720 range_get_tries: count of tries to download each range
721 buffer_size: size of a buffer for on the fly data
722 """
723 file_path = anyio.Path(file_path)
724 resp = await self.head(object_name, headers=headers)
725
726 etag = resp.headers["Etag"]
727 file_size = int(resp.headers["Content-Length"])
728 log.debug(
729 "Object's %s etag is %s and size is %d",
730 object_name,
731 etag,
732 file_size,
733 )
734
735 if file_size == 0:
736 await file_path.touch()
737 return
738
739 worker_range_size = file_size // workers_count
740 range_end = 0
741 try:
742 try:
743 async with (
744 await anyio.open_file(file_path, "w+b") as fp,
745 parallel_file_writer(fp) as pfw,
746 anyio.create_task_group() as tg,
747 ):
748 for range_start in range(0, file_size, worker_range_size):
749 range_end += worker_range_size
750 if range_end > file_size:
751 range_end = file_size
752 tg.start_soon(partial(
753 self._download_worker,
754 object_name,
755 await pfw.get_block(range_start, range_end),
756 buffer_size=buffer_size,
757 etag=etag,
758 headers=headers,
759 range_end=range_end,
760 range_get_tries=range_get_tries,
761 range_start=range_start,
762 range_step=range_step,
763 **kwargs,
764 ), name=f"get-worker@{range_start}")
765 except* PreconditionFailed as excgroup:
766 raise ConflictError(f"Object {object_name!r} changed while downloading") from excgroup
767 except* Exception as excgroup:
768 # Unwrap and raise just one
769 for exc in excgroup.exceptions:
770 raise exc from None
771
772 except Exception:
773 log.exception(
774 "Error on file download. Removing possibly incomplete file %s",
775 file_path,
776 )
777 with suppress(FileNotFoundError):
778 await file_path.unlink()
779 raise
780
[docs]
781 async def list_objects_v2(
782 self,
783 object_name: str|URL|None = None,
784 *,
785 bucket: S3Bucket|None = None,
786 prefix: t.Optional[t.Union[str, Path]] = None,
787 delimiter: t.Optional[str] = None,
788 max_keys: t.Optional[int] = None,
789 start_after: t.Optional[str] = None,
790 ) -> t.AsyncIterator[t.List[AwsObjectMeta]]:
791 """
792 List objects in bucket.
793
794 Args:
795 object_name:
796 path to listing endpoint, defaults to ``'/'``; a ``bucket`` value is
797 prepended to this value if provided.
798 prefix:
799 limits the response to keys that begin with the specified
800 prefix
801 delimiter: a delimiter is a character you use to group keys
802 max_keys: maximum number of keys returned in the response
803 start_after: keys to start listing after
804
805 Returns:
806 : An iterator over lists of metadata objects, each corresponding
807 to an individual response result (typically limited to 1000 keys).
808
809 .. note::
810
811 Despite the name :meth:`list_objects` is newer and probably more ergonomic.
812 """
813
814 params = {
815 "list-type": "2",
816 }
817
818 if prefix:
819 params["prefix"] = str(prefix)
820
821 if delimiter:
822 params["delimiter"] = delimiter
823
824 if max_keys:
825 params["max-keys"] = str(max_keys)
826
827 if start_after:
828 params["start-after"] = start_after
829
830 if bucket and object_name:
831 raise ValueError("Must give either bucket or object_name")
832
833 while True:
834 resp = await self.get(bucket or object_name or '', params=params, headers={'Accept': 'application/xml'})
835 payload = resp.content
836 metadata, continuation_token = parse_list_objects(payload)
837 if not metadata:
838 break
839 yield metadata
840 if not continuation_token:
841 break
842 params["continuation-token"] = continuation_token
843
[docs]
844 async def list_objects(
845 self,
846 bucket: S3Bucket|None=None,
847 *,
848 prefix: str|None = None,
849 delimiter: t.Optional[str] = None,
850 max_keys: t.Optional[int] = None,
851 start_after: t.Optional[str] = None,
852 ) -> t.AsyncIterator[AwsObjectMeta]:
853 """
854 List objects in bucket.
855
856 Args:
857 bucket:
858 Name of the bucket to list, or :data:`None` if the bucket is in the endpoint.
859 prefix:
860 limits the response to keys that begin with the specified
861 prefix
862 delimiter: a delimiter is a character you use to group keys
863 max_keys: maximum number of keys returned in the response
864 start_after: keys to start listing after
865
866 Returns:
867 : Iterator over metadata objects
868 """
869 params = {
870 "list-type": "2",
871 }
872
873 if prefix:
874 params["prefix"] = str(prefix)
875
876 if delimiter:
877 params["delimiter"] = delimiter
878
879 if max_keys:
880 params["max-keys"] = str(max_keys)
881
882 if start_after:
883 params["start-after"] = start_after
884
885 while True:
886 resp = await self.get(bucket or '', params=params, headers={'Accept': 'application/xml'})
887 payload = resp.content
888 metadata, continuation_token = parse_list_objects(payload)
889 if not metadata:
890 break
891 for objmeta in metadata:
892 yield objmeta
893 if not continuation_token:
894 break
895 params["continuation-token"] = continuation_token