1# AWS S3 API Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/Type_API_Reference.html
2# Probably useful to anyone working on this module
3
4import hashlib
5import io
6import logging
7import os
8import typing as t
9from collections import deque
10from contextlib import suppress
11from dataclasses import dataclass
12from functools import partial
13from http import HTTPStatus
14from itertools import chain
15from mimetypes import guess_type
16from mmap import PAGESIZE
17from pathlib import Path
18from tempfile import NamedTemporaryFile
19import textwrap
20
21import anyio
22import anyio.streams.memory
23from anyiomisc import asyncbackoff
24from aws_request_signer import UNSIGNED_PAYLOAD
25from httpx import URL, HTTPError, AsyncClient, Response, QueryParams, HTTPStatusError
26
27from ._async import generate_in_thread, run_in_thread, parallel_file_writer, ChunkSendStream
28from ._xml import (
29 AwsObjectMeta, create_complete_upload_request,
30 parse_create_multipart_upload_id, parse_list_objects, parse_error,
31)
32from .credentials import (
33 AbstractCredentials, collect_credentials,
34)
35from .exceptions import S3Error, ConflictError, PreconditionFailed
36
37
38log = logging.getLogger(__name__)
39
40CHUNK_SIZE = 2 ** 16
41
42DONE = object()
43EMPTY_STR_HASH = hashlib.sha256(b"").hexdigest()
44# 5MB
45
46PART_SIZE = 5 * 1024 * 1024
47HeadersType = t.Union[t.Dict]
48
49DataType = t.Optional[t.Mapping[str, t.Any]]
50RequestContent = t.Optional[t.Union[str, bytes, t.Iterable[bytes], t.AsyncIterable[bytes]]]
51PrimitiveData = t.Optional[t.Union[str, int, float, bool]]
52QueryParamTypes = t.Union[
53 QueryParams,
54 t.Mapping[str, t.Union[PrimitiveData, t.Sequence[PrimitiveData]]],
55 t.List[t.Tuple[str, PrimitiveData]],
56 t.Tuple[t.Tuple[str, PrimitiveData], ...],
57 str,
58 bytes,
59]
60
61
62@dataclass
63class HEADERS:
64 CONTENT_LENGTH = 'Content-Length'
65 CONTENT_TYPE = 'Content-Type'
66
67
68@run_in_thread
69def concat_files(
70 target_file: Path, files: t.List[t.IO[bytes]], buffer_size: int,
71) -> None:
72 with target_file.open("ab") as fp:
73 for file in files:
74 file.seek(0)
75 while True:
76 chunk = file.read(buffer_size)
77 if not chunk:
78 break
79 fp.write(chunk)
80 file.close()
81
82
83@run_in_thread
84def write_from_start(
85 file: io.BytesIO, chunk: bytes, range_start: int, pos: int,
86) -> None:
87 file.seek(pos - range_start)
88 file.write(chunk)
89
90
91@generate_in_thread
92def gen_without_hash(
93 stream: t.Iterable[bytes],
94) -> t.Generator[t.Tuple[None, bytes], None, None]:
95 for data in stream:
96 yield (None, data)
97
98
99@generate_in_thread
100def gen_with_hash(
101 stream: t.Iterable[bytes],
102) -> t.Generator[t.Tuple[str, bytes], None, None]:
103 for data in stream:
104 yield hashlib.sha256(data).hexdigest(), data
105
106
107def file_sender(
108 file_name: str|os.PathLike, chunk_size: int = CHUNK_SIZE,
109) -> t.Iterable[bytes]:
110 with open(file_name, "rb") as fp:
111 while True:
112 data = fp.read(chunk_size)
113 if not data:
114 break
115 yield data
116
117
118async_file_sender = generate_in_thread(file_sender)
119
120MultiPart = tuple
121
122#: Type hint for anything expecting a specific object
123type S3Object = str | URL
124#: Type hint for anything expecting a bucket
125type S3Bucket = str | URL
126
[docs]
127class S3Client:
128 def __init__(
129 self, client: AsyncClient, url: t.Union[URL, str],
130 secret_access_key: t.Optional[str] = None,
131 access_key_id: t.Optional[str] = None,
132 session_token: t.Optional[str] = None,
133 region: str = "",
134 credentials: t.Optional[AbstractCredentials] = None,
135 ):
136 url = URL(url)
137 if credentials is None:
138 credentials = collect_credentials(
139 url=url,
140 access_key_id=access_key_id,
141 region=region,
142 secret_access_key=secret_access_key,
143 session_token=session_token,
144 )
145
146 if not credentials:
147 raise ValueError(
148 f"Credentials {credentials!r} is incomplete",
149 )
150
151 self._url = url
152 self._client = client
153 self._credentials = credentials
154
155 def __repr__(self):
156 return f"<{type(self).__name__} {self._url!r} creds={self._credentials!r}>"
157
158 @property
159 def url(self) -> URL:
160 """
161 S3 Endpoint, see :ref:`urls`.
162 """
163 return self._url
164
165 def _mkurl(self, path: S3Object|S3Bucket) -> URL:
166 """
167 Combine a path/s3-url with the endpoint to create the real URL.
168 """
169 # This method is where host-based buckets would go.
170 if not path:
171 # Referring to the implicit bucket
172 return self._url
173 elif isinstance(path, str):
174 # Classical string-based "<bucket>/<object>" (or whatever)
175 return self._url.join(path)
176 elif isinstance(path, URL):
177 if path.scheme == "s3" or path.scheme.startswith("s3+") or path.scheme.endswith("+s3"):
178 # s3://<bucket>/<object>
179 # s3+<whatever>://<bucket>/<object>
180 # <whatever>+s3://<bucket>/<object>
181 if path.path in ("", "/"):
182 # No object
183 if path.host:
184 return self._url.join(path.host)
185 else:
186 # Implied bucket
187 return self._url
188 elif not path.path.startswith("/"):
189 raise ValueError(f"Relative S3 URLs not supported ({path!r})")
190 elif not path.host:
191 # Implied bucket
192 assert path.path[0] == "/"
193 return self._url.join(path.path[1:])
194 else:
195 # Bucket and object given
196 return self._url.join(f"{path.host}{path.path}")
197 elif path.scheme in ("http", "https"):
198 # Full URL, I guess
199 return self._url.join(path)
200 else:
201 raise ValueError(f"Don't know how to make a request URL out of {path!r}")
202 else:
203 raise TypeError(f"Don't know how to make a request URL out of {path!r}")
204
205 async def request(
206 self, method: str, path: S3Object|S3Bucket,
207 headers: t.Optional[HeadersType] = None,
208 params: t.Optional[QueryParams] = None,
209 content: t.Optional[RequestContent] = None,
210 content_sha256: t.Optional[str] = None,
211 **kwargs,
212 ) -> Response:
213 """
214 Make an arbitrary request.
215
216 :meta private:
217 """
218 headers = self._prepare_headers(headers)
219
220 if content is not None and content_sha256 is None:
221 content_sha256 = UNSIGNED_PAYLOAD
222
223 url = self._mkurl(path)
224 if params:
225 url = url.copy_merge_params(params)
226
227 headers = self._make_headers(headers)
228 headers.update(
229 self._credentials.signer.sign_with_headers(
230 method, str(url), headers=headers, content_hash=content_sha256,
231 ),
232 )
233 resp = await self._client.request(
234 method, url, headers=headers, content=content, **kwargs,
235 )
236 try:
237 resp.raise_for_status()
238 except HTTPError as herr:
239 # Annotate the original exception with useful information
240 try:
241 herr.add_note(f"URL: {herr.request.url}")
242 except Exception:
243 pass
244 if hasattr(herr, "response"):
245 try:
246 herr.add_note(f"Body:\n{textwrap.indent(herr.response.text, ' ')}")
247 except Exception:
248 pass
249
250 if resp.headers.get('Content-Type', None) == 'application/xml' and b'<Error>' in resp.content:
251 # It looks like error XML, wrap it up so it's useful
252 try:
253 err = parse_error(resp.content)
254 except ValueError:
255 # Bad XML, skip the wrapping
256 raise herr
257 try:
258 err.request = herr.request
259 except Exception:
260 pass
261 if hasattr(herr, "response"):
262 try:
263 err.response = herr.response
264 except Exception:
265 pass
266 raise err from herr
267 else:
268 raise
269 else:
270 return resp
271
[docs]
272 async def get(self, object_name: S3Object|S3Bucket, **kwargs) -> Response:
273 """
274 Get an object.
275
276 Returns:
277 : The object gotten.
278 """
279 return await self.request("GET", object_name, **kwargs)
280
[docs]
281 async def head(
282 self, object_name: S3Object|S3Bucket,
283 content_sha256=EMPTY_STR_HASH,
284 **kwargs,
285 ) -> Response:
286 """
287 Get an object's headers.
288
289 Returns:
290 : The object gotten.
291 """
292 return await self.request(
293 "HEAD", object_name, content_sha256=content_sha256, **kwargs,
294 )
295
[docs]
296 async def delete(
297 self, object_name: S3Object|S3Bucket,
298 content_sha256=EMPTY_STR_HASH,
299 **kwargs,
300 ) -> Response:
301 """
302 Delete an object.
303 """
304 return await self.request(
305 "DELETE", object_name, content_sha256=content_sha256, **kwargs,
306 )
307
308 @staticmethod
309 def _make_headers(headers: t.Optional[HeadersType]) -> dict:
310 headers = dict(headers or {})
311 return headers
312
313 def _prepare_headers(
314 self, headers: t.Optional[HeadersType],
315 file_path: str = "",
316 ) -> dict:
317 headers = self._make_headers(headers)
318
319 if HEADERS.CONTENT_TYPE not in headers:
320 content_type = guess_type(file_path)[0]
321 if content_type is None:
322 content_type = "application/octet-stream"
323
324 headers[HEADERS.CONTENT_TYPE] = content_type
325
326 return headers
327
[docs]
328 async def put(
329 self, object_name: S3Object|S3Bucket,
330 content: RequestContent,
331 **kwargs,
332 ) -> Response:
333 """
334 Update an object.
335 """
336 return await self.request("PUT", object_name, content=content, **kwargs)
337
[docs]
338 async def post(
339 self, object_name: S3Object|S3Bucket,
340 content: RequestContent = None,
341 **kwargs,
342 ) -> Response:
343 return await self.request("POST", object_name, content=content, **kwargs)
344
[docs]
345 async def put_file(
346 self, object_name: S3Object,
347 file_path: str|os.PathLike,
348 *, headers: t.Optional[HeadersType] = None,
349 chunk_size: int = CHUNK_SIZE, content_sha256: t.Optional[str] = None,
350 ) -> Response:
351 """
352 Update an object from a file (by path)
353 """
354 headers = self._prepare_headers(headers, str(file_path))
355 return await self.put(
356 object_name,
357 headers=headers,
358 content=async_file_sender(
359 file_path,
360 chunk_size=chunk_size,
361 ),
362 content_sha256=content_sha256,
363 )
364
365 @asyncbackoff(
366 None, None, 0,
367 max_tries=3, exceptions=(HTTPError,),
368 )
369 async def _create_multipart_upload(
370 self,
371 object_name: S3Object,
372 headers: t.Optional[HeadersType] = None,
373 ) -> str:
374 resp = await self.post(
375 object_name,
376 headers=headers,
377 params={"uploads": 1},
378 content_sha256=EMPTY_STR_HASH,
379 )
380 payload = resp.read()
381 return parse_create_multipart_upload_id(payload)
382
383 @asyncbackoff(
384 None, None, 0,
385 max_tries=3, exceptions=(S3Error, HTTPError),
386 )
387 async def _complete_multipart_upload(
388 self,
389 upload_id: str,
390 object_name: S3Object,
391 parts: t.List[t.Tuple[int, str]],
392 ) -> None:
393 complete_upload_request = create_complete_upload_request(parts)
394 resp = await self.post(
395 object_name,
396 headers={"Content-Type": "text/xml"},
397 params={"uploadId": upload_id},
398 content=complete_upload_request,
399 content_sha256=hashlib.sha256(complete_upload_request).hexdigest(),
400 )
401
402 async def _put_part(
403 self,
404 upload_id: str,
405 object_name: S3Object,
406 part_no: int,
407 content: RequestContent,
408 content_sha256: str,
409 **kwargs,
410 ) -> str:
411 resp = await self.put(
412 object_name,
413 params={"partNumber": part_no, "uploadId": upload_id},
414 content=content,
415 content_sha256=content_sha256,
416 **kwargs,
417 )
418 return resp.headers["Etag"].strip('"')
419
420 async def _part_uploader(
421 self,
422 upload_id: str,
423 object_name: S3Object,
424 parts_stream: anyio.streams.memory.MemoryObjectReceiveStream[MultiPart],
425 results_queue: deque,
426 part_upload_tries: int,
427 **kwargs,
428 ) -> None:
429 backoff = asyncbackoff(
430 None, None,
431 max_tries=part_upload_tries,
432 exceptions=(HTTPError,),
433 )
434 async for part_no, part_hash, part in parts_stream:
435 etag = await backoff(self._put_part)(
436 upload_id=upload_id,
437 object_name=object_name,
438 part_no=part_no,
439 content=part,
440 content_sha256=part_hash,
441 **kwargs,
442 )
443 log.debug(
444 "Etag for part %d of %s is %s", part_no, upload_id, etag,
445 )
446 results_queue.append((part_no, etag))
447
[docs]
448 async def put_file_multipart(
449 self,
450 object_name: S3Object,
451 file_path: str|os.PathLike,
452 *,
453 headers: t.Optional[HeadersType] = None,
454 part_size: int = PART_SIZE,
455 workers_count: int = 1,
456 max_size: t.Optional[int] = None,
457 part_upload_tries: int = 3,
458 calculate_content_sha256: bool = True,
459 **kwargs,
460 ) -> None:
461 """
462 Upload data from a file with multipart upload
463
464 Args:
465 object_name: key in s3
466 file_path: path to a file for upload
467 headers: additional headers, such as Content-Type
468 part_size: size of a chunk to send (recommended: >5Mb)
469 workers_count: count of coroutines for asyncronous parts uploading
470 max_size: maximum size of a queue with data to send (should be
471 at least ``workers_count``)
472 part_upload_tries: how many times trying to put part to s3 before fail
473 calculate_content_sha256: whether to calculate sha256 hash of a part
474 for integrity purposes
475 """
476 log.debug(
477 "Going to multipart upload %s to %s with part size %d",
478 file_path, object_name, part_size,
479 )
480 await self.put_multipart(
481 object_name,
482 file_sender(
483 file_path,
484 chunk_size=part_size,
485 ),
486 headers=headers,
487 workers_count=workers_count,
488 max_size=max_size,
489 part_upload_tries=part_upload_tries,
490 calculate_content_sha256=calculate_content_sha256,
491 **kwargs,
492 )
493
494 async def _parts_generator(
495 self, gen: t.AsyncIterable[tuple], workers_count: int, parts_stream: anyio.streams.memory.MemoryObjectSendStream[MultiPart],
496 ) -> int:
497 part_no = 1
498 async with parts_stream:
499 async for part_hash, part in gen:
500 log.debug(
501 "Reading part %d (%d bytes)", part_no, len(part),
502 )
503 await parts_stream.send((part_no, part_hash, part))
504 part_no += 1
505
506 return part_no
507
[docs]
508 async def put_multipart(
509 self,
510 object_name: S3Object,
511 content: t.Iterable[bytes],
512 *,
513 headers: t.Optional[HeadersType] = None,
514 workers_count: int = 1,
515 max_size: t.Optional[int] = None,
516 part_upload_tries: int = 3,
517 calculate_content_sha256: bool = True,
518 **kwargs,
519 ) -> None:
520 """
521 Send data from iterable with multipart upload
522
523 Args:
524 object_name: key in s3
525 data: any iterable that returns chunks of bytes
526 headers: additional headers, such as Content-Type
527 workers_count: count of coroutines for asyncronous parts uploading
528 max_size: maximum size of a queue with data to send (should be
529 at least ``workers_count``)
530 part_upload_tries: how many times trying to put part to s3 before fail
531 calculate_content_sha256: whether to calculate sha256 hash of a part
532 for integrity purposes
533 """
534 if workers_count < 1:
535 raise ValueError(
536 f"Workers count should be > 0. Got {workers_count}",
537 )
538 max_size = max_size or workers_count
539
540 upload_id = await self._create_multipart_upload(
541 object_name,
542 headers=headers,
543 )
544 log.debug("Got upload id %s for %s", upload_id, object_name)
545
546 results_queue: deque = deque()
547 try:
548 async with anyio.create_task_group() as tg:
549 send_stream, receive_stream = anyio.create_memory_object_stream()
550 for wid in range(workers_count):
551 tg.start_soon(partial(
552 self._part_uploader,
553 upload_id,
554 object_name,
555 receive_stream.clone(),
556 results_queue,
557 part_upload_tries,
558 **kwargs,
559 ), name=f"put-worker-{upload_id}@{wid}")
560 # Get rid of our copy
561 receive_stream.close()
562 del receive_stream
563
564 if calculate_content_sha256:
565 gen = gen_with_hash(content)
566 else:
567 gen = gen_without_hash(content)
568
569 part_no = await self._parts_generator(gen, workers_count, send_stream)
570 except* Exception as excgroup:
571 for exc in excgroup.exceptions:
572 raise exc from None
573
574 log.debug(
575 "All parts (#%d) of %s are uploaded to %s",
576 part_no - 1, upload_id, object_name,
577 )
578
579 # Parts should be in ascending order
580 parts = sorted(results_queue, key=lambda x: x[0])
581 await self._complete_multipart_upload(
582 upload_id, object_name, parts,
583 )
584
585 async def _download_range(
586 self,
587 object_name: S3Object,
588 writer: ChunkSendStream,
589 *,
590 etag: str,
591 pos: int,
592 range_start: int,
593 req_range_start: int,
594 req_range_end: int,
595 buffer_size: int,
596 headers: t.Optional[HeadersType] = None,
597 **kwargs,
598 ) -> None:
599 """
600 Downloading range [req_range_start:req_range_end] to `file`
601 """
602 log.debug(
603 "Downloading %s from %d to %d",
604 object_name,
605 req_range_start,
606 req_range_end,
607 )
608 if not headers:
609 headers = {}
610 headers = headers.copy()
611 headers["Range"] = f"bytes={req_range_start}-{req_range_end}"
612 headers["If-Match"] = etag
613
614 resp = await self.get(object_name, headers=headers, **kwargs)
615 if resp.status_code not in (HTTPStatus.PARTIAL_CONTENT, HTTPStatus.OK):
616 raise HTTPStatusError(
617 f"Got wrong status code {resp.status_code} on range download "
618 f"of {object_name}",
619 request=resp.request, response=resp
620 )
621 assert 'Content-Range' in resp.headers
622 assert resp.headers['Content-Range'].startswith(f"bytes {req_range_start}-{req_range_end}/")
623 # FIXME: Handle OK
624 # FIXME: Handle Content-Range being different from requested
625 pos = req_range_start
626 async for chunk in resp.aiter_bytes(buffer_size):
627 if not chunk:
628 break
629 await writer.send((pos, chunk))
630 pos += len(chunk)
631
632 async def _download_worker(
633 self,
634 object_name: S3Object,
635 writer: ChunkSendStream,
636 *,
637 etag: str,
638 range_step: int,
639 range_start: int,
640 range_end: int,
641 buffer_size: int,
642 range_get_tries: int = 3,
643 headers: t.Optional[HeadersType] = None,
644 **kwargs,
645 ) -> None:
646 """
647 Downloads data in range `[range_start, range_end)`
648 with step `range_step` to file `file_path`.
649 Uses `etag` to make sure that file wasn't changed in the process.
650 """
651 log.debug(
652 "Starting download worker for range [%d:%d]",
653 range_start,
654 range_end,
655 )
656 async with writer:
657 backoff = asyncbackoff(
658 None, None,
659 max_tries=range_get_tries,
660 exceptions=(HTTPError,),
661 )
662 req_range_end = range_start
663 for req_range_start in range(range_start, range_end, range_step):
664 req_range_end += range_step
665 if req_range_end > range_end:
666 req_range_end = range_end
667 await backoff(self._download_range)(
668 object_name,
669 writer,
670 etag=etag,
671 pos=(req_range_start - range_start),
672 range_start=range_start,
673 req_range_start=req_range_start,
674 req_range_end=req_range_end - 1,
675 buffer_size=buffer_size,
676 headers=headers,
677 **kwargs,
678 )
679
[docs]
680 async def get_file_parallel(
681 self,
682 object_name: S3Object,
683 file_path: str|os.PathLike,
684 *,
685 headers: t.Optional[HeadersType] = None,
686 range_step: int = PART_SIZE,
687 workers_count: int = 1,
688 range_get_tries: int = 3,
689 buffer_size: int = PAGESIZE * 32,
690 **kwargs,
691 ) -> None:
692 """
693 Download object in parallel with requests with Range. If file changed
694 while download is in progress, an error will be raised.
695
696 Args:
697 object_name: s3 key to download
698 file_path: target file path
699 headers: additional headers
700 range_step: how much data will be downloaded in single HTTP request
701 workers_count: count of parallel workers
702 range_get_tries: count of tries to download each range
703 buffer_size: size of a buffer for on the fly data
704 """
705 file_path = anyio.Path(file_path)
706 resp = await self.head(object_name, headers=headers)
707
708 etag = resp.headers["Etag"]
709 file_size = int(resp.headers["Content-Length"])
710 log.debug(
711 "Object's %s etag is %s and size is %d",
712 object_name,
713 etag,
714 file_size,
715 )
716
717 worker_range_size = file_size // workers_count
718 range_end = 0
719 try:
720 try:
721 async with (
722 await anyio.open_file(file_path, "w+b") as fp,
723 parallel_file_writer(fp) as pfw,
724 anyio.create_task_group() as tg,
725 ):
726 for range_start in range(0, file_size, worker_range_size):
727 range_end += worker_range_size
728 if range_end > file_size:
729 range_end = file_size
730 tg.start_soon(partial(
731 self._download_worker,
732 object_name,
733 await pfw.get_block(range_start, range_end),
734 buffer_size=buffer_size,
735 etag=etag,
736 headers=headers,
737 range_end=range_end,
738 range_get_tries=range_get_tries,
739 range_start=range_start,
740 range_step=range_step,
741 **kwargs,
742 ), name=f"get-worker@{range_start}")
743 except* PreconditionFailed as excgroup:
744 raise ConflictError(f"Object {object_name!r} changed while downloading") from excgroup
745 except* Exception as excgroup:
746 # Unwrap and raise just one
747 for exc in excgroup.exceptions:
748 raise exc from None
749
750 except Exception:
751 log.exception(
752 "Error on file download. Removing possibly incomplete file %s",
753 file_path,
754 )
755 with suppress(FileNotFoundError):
756 await file_path.unlink()
757 raise
758
[docs]
759 async def list_objects_v2(
760 self,
761 object_name: str|URL|None = None,
762 *,
763 bucket: S3Bucket|None = None,
764 prefix: t.Optional[t.Union[str, Path]] = None,
765 delimiter: t.Optional[str] = None,
766 max_keys: t.Optional[int] = None,
767 start_after: t.Optional[str] = None,
768 ) -> t.AsyncIterator[t.List[AwsObjectMeta]]:
769 """
770 List objects in bucket.
771
772 Args:
773 object_name:
774 path to listing endpoint, defaults to ``'/'``; a ``bucket`` value is
775 prepended to this value if provided.
776 prefix:
777 limits the response to keys that begin with the specified
778 prefix
779 delimiter: a delimiter is a character you use to group keys
780 max_keys: maximum number of keys returned in the response
781 start_after: keys to start listing after
782
783 Returns:
784 : An iterator over lists of metadata objects, each corresponding
785 to an individual response result (typically limited to 1000 keys).
786
787 .. note::
788
789 Despite the name :meth:`list_objects` is newer and probably more ergonomic.
790 """
791
792 params = {
793 "list-type": "2",
794 }
795
796 if prefix:
797 params["prefix"] = str(prefix)
798
799 if delimiter:
800 params["delimiter"] = delimiter
801
802 if max_keys:
803 params["max-keys"] = str(max_keys)
804
805 if start_after:
806 params["start-after"] = start_after
807
808 if bucket and object_name:
809 raise ValueError("Must give either bucket or object_name")
810
811 while True:
812 resp = await self.get(bucket or object_name or '', params=params, headers={'Accept': 'application/xml'})
813 payload = resp.content
814 metadata, continuation_token = parse_list_objects(payload)
815 if not metadata:
816 break
817 yield metadata
818 if not continuation_token:
819 break
820 params["continuation-token"] = continuation_token
821
[docs]
822 async def list_objects(
823 self,
824 bucket: S3Bucket|None=None,
825 *,
826 prefix: str|None = None,
827 delimiter: t.Optional[str] = None,
828 max_keys: t.Optional[int] = None,
829 start_after: t.Optional[str] = None,
830 ) -> t.AsyncIterator[AwsObjectMeta]:
831 """
832 List objects in bucket.
833
834 Args:
835 bucket:
836 Name of the bucket to list, or :data:`None` if the bucket is in the endpoint.
837 prefix:
838 limits the response to keys that begin with the specified
839 prefix
840 delimiter: a delimiter is a character you use to group keys
841 max_keys: maximum number of keys returned in the response
842 start_after: keys to start listing after
843
844 Returns:
845 : Iterator over metadata objects
846 """
847 params = {
848 "list-type": "2",
849 }
850
851 if prefix:
852 params["prefix"] = str(prefix)
853
854 if delimiter:
855 params["delimiter"] = delimiter
856
857 if max_keys:
858 params["max-keys"] = str(max_keys)
859
860 if start_after:
861 params["start-after"] = start_after
862
863 while True:
864 resp = await self.get(bucket or '', params=params, headers={'Accept': 'application/xml'})
865 payload = resp.content
866 metadata, continuation_token = parse_list_objects(payload)
867 if not metadata:
868 break
869 for objmeta in metadata:
870 yield objmeta
871 if not continuation_token:
872 break
873 params["continuation-token"] = continuation_token