Source code for handtruck._xml
1from datetime import datetime, timezone
2import functools
3import re
4from sys import intern
5from typing import List, NamedTuple, Optional, Tuple
6from xml.etree import ElementTree as ET
7
8from . import exceptions
9
10
11NS = "http://s3.amazonaws.com/doc/2006-03-01/"
12
13
20
21
22def parse_create_multipart_upload_id(payload: bytes) -> str:
23 root = ET.fromstring(payload)
24 uploadid_el = root.find(f"{{{NS}}}UploadId")
25 if uploadid_el is None:
26 uploadid_el = root.find("UploadId")
27 if uploadid_el is None or uploadid_el.text is None:
28 raise ValueError(f"Upload id not found in {payload!r}")
29 return uploadid_el.text
30
31
32def create_complete_upload_request(parts: List[Tuple[int, str]]) -> bytes:
33 ET.register_namespace("", NS)
34 root = ET.Element(f"{{{NS}}}CompleteMultipartUpload")
35
36 for part_no, etag in parts:
37 part_el = ET.SubElement(root, "Part")
38 etag_el = ET.SubElement(part_el, "ETag")
39 etag_el.text = etag
40 part_number_el = ET.SubElement(part_el, "PartNumber")
41 part_number_el.text = str(part_no)
42
43 return (
44 b'<?xml version="1.0" encoding="UTF-8"?>' +
45 ET.tostring(root, encoding="UTF-8")
46 )
47
48
49def parse_list_objects(payload: bytes) -> Tuple[
50 List[AwsObjectMeta], Optional[str],
51]:
52 root = ET.fromstring(payload)
53 result = []
54 for el in root.findall(f"{{{NS}}}Contents"):
55 etag = key = last_modified = size = storage_class = None
56 for child in el:
57 tag = child.tag[child.tag.rfind("}") + 1:]
58 text = child.text
59 if text is None:
60 continue
61 if tag == "ETag":
62 etag = text
63 elif tag == "Key":
64 key = text
65 elif tag == "LastModified":
66 assert text[-1] == "Z"
67 last_modified = datetime.fromisoformat(text[:-1]).replace(
68 tzinfo=timezone.utc,
69 )
70 elif tag == "Size":
71 size = int(text)
72 elif tag == "StorageClass":
73 storage_class = intern(text)
74 if (
75 etag and
76 key and
77 last_modified and
78 size is not None and
79 storage_class
80 ):
81 meta = AwsObjectMeta(etag, key, last_modified, size, storage_class)
82 result.append(meta)
83 nct_el = root.find(f"{{{NS}}}NextContinuationToken")
84 continuation_token = nct_el.text if nct_el is not None else None
85 return result, continuation_token
86
87
88CAMEL_PATTERN = re.compile(
89 r"""
90 (?<=[a-z]) # preceded by lowercase
91 (?=[A-Z]) # followed by uppercase
92 | # OR
93 (?<=[A-Z]) # preceded by lowercase
94 (?=[A-Z][a-z]) # followed by uppercase, then lowercase
95 """,
96 re.X,
97)
98
99
100@functools.cache
101def _camel2snake(name: str) -> str:
102 return intern(CAMEL_PATTERN.sub('_', name).lower())
103
104
105def parse_error(payload: bytes) -> exceptions.S3Error:
106 # https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#RESTErrorResponses
107 root = ET.fromstring(payload)
108 if root.tag != "Error":
109 raise ValueError(f"Wrong XML; found {root.tag} at the root")
110 props = {
111 _camel2snake(child.tag): child.text
112 for child in root
113 }
114 clsname: str = props.pop("code") or "S3Error"
115 msg = props.pop("message")
116 return getattr(exceptions, clsname)(msg, **props)