Source code for handtruck._xml

  1from datetime import datetime, timezone
  2import functools
  3import re
  4from sys import intern
  5from typing import List, NamedTuple, Optional, Tuple
  6from xml.etree import ElementTree as ET
  7
  8from . import exceptions
  9
 10
 11NS = "http://s3.amazonaws.com/doc/2006-03-01/"
 12
 13
[docs] 14class AwsObjectMeta(NamedTuple): 15 etag: str 16 key: str 17 last_modified: datetime 18 size: int 19 storage_class: str
20 21 22def parse_create_multipart_upload_id(payload: bytes) -> str: 23 root = ET.fromstring(payload) 24 uploadid_el = root.find(f"{{{NS}}}UploadId") 25 if uploadid_el is None: 26 uploadid_el = root.find("UploadId") 27 if uploadid_el is None or uploadid_el.text is None: 28 raise ValueError(f"Upload id not found in {payload!r}") 29 return uploadid_el.text 30 31 32def create_complete_upload_request(parts: List[Tuple[int, str]]) -> bytes: 33 ET.register_namespace("", NS) 34 root = ET.Element(f"{{{NS}}}CompleteMultipartUpload") 35 36 for part_no, etag in parts: 37 part_el = ET.SubElement(root, "Part") 38 etag_el = ET.SubElement(part_el, "ETag") 39 etag_el.text = etag 40 part_number_el = ET.SubElement(part_el, "PartNumber") 41 part_number_el.text = str(part_no) 42 43 return ( 44 b'<?xml version="1.0" encoding="UTF-8"?>' + 45 ET.tostring(root, encoding="UTF-8") 46 ) 47 48 49def parse_list_objects(payload: bytes) -> Tuple[ 50 List[AwsObjectMeta], Optional[str], 51]: 52 root = ET.fromstring(payload) 53 result = [] 54 for el in root.findall(f"{{{NS}}}Contents"): 55 etag = key = last_modified = size = storage_class = None 56 for child in el: 57 tag = child.tag[child.tag.rfind("}") + 1:] 58 text = child.text 59 if text is None: 60 continue 61 if tag == "ETag": 62 etag = text 63 elif tag == "Key": 64 key = text 65 elif tag == "LastModified": 66 assert text[-1] == "Z" 67 last_modified = datetime.fromisoformat(text[:-1]).replace( 68 tzinfo=timezone.utc, 69 ) 70 elif tag == "Size": 71 size = int(text) 72 elif tag == "StorageClass": 73 storage_class = intern(text) 74 if ( 75 etag and 76 key and 77 last_modified and 78 size is not None and 79 storage_class 80 ): 81 meta = AwsObjectMeta(etag, key, last_modified, size, storage_class) 82 result.append(meta) 83 nct_el = root.find(f"{{{NS}}}NextContinuationToken") 84 continuation_token = nct_el.text if nct_el is not None else None 85 return result, continuation_token 86 87 88CAMEL_PATTERN = re.compile( 89 r""" 90 (?<=[a-z]) # preceded by lowercase 91 (?=[A-Z]) # followed by uppercase 92 | # OR 93 (?<=[A-Z]) # preceded by lowercase 94 (?=[A-Z][a-z]) # followed by uppercase, then lowercase 95 """, 96 re.X, 97) 98 99 100@functools.cache 101def _camel2snake(name: str) -> str: 102 return intern(CAMEL_PATTERN.sub('_', name).lower()) 103 104 105def parse_error(payload: bytes) -> exceptions.S3Error: 106 # https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#RESTErrorResponses 107 root = ET.fromstring(payload) 108 if root.tag != "Error": 109 raise ValueError(f"Wrong XML; found {root.tag} at the root") 110 props = { 111 _camel2snake(child.tag): child.text 112 for child in root 113 } 114 clsname: str = props.pop("code") or "S3Error" 115 msg = props.pop("message") 116 return getattr(exceptions, clsname)(msg, **props)