Source code for handtruck._xml

 1from datetime import datetime, timezone
 2from sys import intern
 3from typing import List, NamedTuple, Optional, Tuple
 4from xml.etree import ElementTree as ET
 5
 6NS = "http://s3.amazonaws.com/doc/2006-03-01/"
 7
 8
[docs] 9class AwsObjectMeta(NamedTuple): 10 etag: str 11 key: str 12 last_modified: datetime 13 size: int 14 storage_class: str
15 16 17def parse_create_multipart_upload_id(payload: bytes) -> str: 18 root = ET.fromstring(payload) 19 uploadid_el = root.find(f"{{{NS}}}UploadId") 20 if uploadid_el is None: 21 uploadid_el = root.find("UploadId") 22 if uploadid_el is None or uploadid_el.text is None: 23 raise ValueError(f"Upload id not found in {payload!r}") 24 return uploadid_el.text 25 26 27def create_complete_upload_request(parts: List[Tuple[int, str]]) -> bytes: 28 ET.register_namespace("", NS) 29 root = ET.Element(f"{{{NS}}}CompleteMultipartUpload") 30 31 for part_no, etag in parts: 32 part_el = ET.SubElement(root, "Part") 33 etag_el = ET.SubElement(part_el, "ETag") 34 etag_el.text = etag 35 part_number_el = ET.SubElement(part_el, "PartNumber") 36 part_number_el.text = str(part_no) 37 38 return ( 39 b'<?xml version="1.0" encoding="UTF-8"?>' + 40 ET.tostring(root, encoding="UTF-8") 41 ) 42 43 44def parse_list_objects(payload: bytes) -> Tuple[ 45 List[AwsObjectMeta], Optional[str], 46]: 47 root = ET.fromstring(payload) 48 result = [] 49 for el in root.findall(f"{{{NS}}}Contents"): 50 etag = key = last_modified = size = storage_class = None 51 for child in el: 52 tag = child.tag[child.tag.rfind("}") + 1:] 53 text = child.text 54 if text is None: 55 continue 56 if tag == "ETag": 57 etag = text 58 elif tag == "Key": 59 key = text 60 elif tag == "LastModified": 61 assert text[-1] == "Z" 62 last_modified = datetime.fromisoformat(text[:-1]).replace( 63 tzinfo=timezone.utc, 64 ) 65 elif tag == "Size": 66 size = int(text) 67 elif tag == "StorageClass": 68 storage_class = intern(text) 69 if ( 70 etag and 71 key and 72 last_modified and 73 size is not None and 74 storage_class 75 ): 76 meta = AwsObjectMeta(etag, key, last_modified, size, storage_class) 77 result.append(meta) 78 nct_el = root.find(f"{{{NS}}}NextContinuationToken") 79 continuation_token = nct_el.text if nct_el is not None else None 80 return result, continuation_token