From aefd5e425f82280a6e03ac3120c3a31a18ba64c3 Mon Sep 17 00:00:00 2001 From: Aaron Edwards Date: Tue, 12 Nov 2024 22:25:08 -0800 Subject: [PATCH 1/2] Feature implementation, docs, and tests for picklepete/pyicloud#467 --- README.rst | 30 +++++++ pyicloud/services/drive.py | 108 +++++++++++++++++++++++- tests/__init__.py | 26 ++++++ tests/const_drive.py | 167 +++++++++++++++++++++++++++++++++++++ tests/test_drive.py | 31 ++++++- 5 files changed, 359 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index 8d7a8791..ab25f6bf 100644 --- a/README.rst +++ b/README.rst @@ -360,6 +360,36 @@ The ``upload`` method can be used to send a file-like object to the iCloud Drive It is strongly suggested to open file handles as binary rather than text to prevent decoding errors further down the line. +You can also interact with files in the ``trash``: + +.. code-block:: pycon + + >>> delete_output = api.drive['Holiday Photos']['2013']['Sicily']['DSC08116.JPG'].delete() + >>> api.drive.trash.dir() + ['DSC08116.JPG'] + + >>> delete_output = api.drive['Holiday Photos']['2013']['Sicily']['DSC08117.JPG'].delete() + >>> api.drive.trash.dir() + ['DSC08116.JPG', 'DSC08117.JPG'] + +You can interact with the ``trash`` similar to a standard directory, with some restrictions. In addition, files in the ``trash`` can be recovered back to their original location, or deleted forever: + +.. code-block:: pycon + + >>> api.drive['Holiday Photos']['2013']['Sicily'].dir() + [] + + >>> recover_output = api.drive.trash['DSC08116.JPG'].recover() + >>> api.drive['Holiday Photos']['2013']['Sicily'].dir() + ['DSC08116.JPG'] + + >>> api.drive.trash.dir() + ['DSC08117.JPG'] + + >>> purge_output = api.drive.trash['DSC08117.JPG'].delete_forever() + >>> api.drive.trash.dir() + [] + Photo Library ======================= diff --git a/pyicloud/services/drive.py b/pyicloud/services/drive.py index 87573ce6..33e471f0 100644 --- a/pyicloud/services/drive.py +++ b/pyicloud/services/drive.py @@ -6,6 +6,7 @@ import mimetypes import os import time +import uuid from re import search from requests import Response @@ -24,6 +25,7 @@ def __init__(self, service_root, document_root, session, params): self.session = session self.params = dict(params) self._root = None + self._trash = None def _get_token_from_cookie(self): for cookie in self.session.cookies: @@ -51,6 +53,17 @@ def get_node_data(self, node_id): self._raise_if_error(request) return request.json()[0] + def custom_request(self, method, path, data=None): + """Raw function to allow for custom requests""" + request = self.session.request( + method, + self._service_root + f"/{path}", + params=self.params, + data=json.dumps(data) if data else None, + ) + self._raise_if_error(request) + return request.json() + def get_file(self, file_id, **kwargs): """Returns iCloud Drive file.""" file_params = dict(self.params) @@ -216,6 +229,44 @@ def move_items_to_trash(self, node_id, etag): self._raise_if_error(request) return request.json() + def recover_items_from_trash(self, node_id, etag): + """Restores an iCloud Drive node from the trash bin""" + request = self.session.post( + self._service_root + "/putBackItemsFromTrash", + params=self.params, + data=json.dumps( + { + "items": [ + { + "drivewsid": node_id, + "etag": etag + } + ], + } + ), + ) + self._raise_if_error(request) + return request.json() + + def delete_forever_from_trash(self, node_id, etag): + """Permanently deletes an iCloud Drive node from the trash bin""" + request = self.session.post( + self._service_root + "/deleteItems", + params=self.params, + data=json.dumps( + { + "items": [ + { + "drivewsid": node_id, + "etag": etag + } + ], + } + ), + ) + self._raise_if_error(request) + return request.json() + @property def root(self): """Returns the root node.""" @@ -223,6 +274,23 @@ def root(self): self._root = DriveNode(self, self.get_node_data("root")) return self._root + @property + def trash(self): + """Returns the trash node.""" + if not self._trash: + self._trash = DriveNode(self, self.get_node_data("TRASH_ROOT")) + return self._trash + + def refresh_root(self): + """Refreshes and returns a fresh root node.""" + self._root = DriveNode(self, self.get_node_data("root")) + return self._root + + def refresh_trash(self): + """Refreshes and returns a fresh trash node.""" + self._trash = DriveNode(self, self.get_node_data("TRASH_ROOT")) + return self._trash + def __getattr__(self, attr): return getattr(self.root, attr) @@ -249,14 +317,29 @@ def __init__(self, conn, data): @property def name(self): """Gets the node name.""" + # check if name is undefined, return drivewsid instead if so. + node_name = self.data.get("name") + if not node_name: + # use drivewsid as name if no name present. + node_name = self.data.get("drivewsid") + # Clean up well-known drivewsid names + if node_name == "FOLDER::com.apple.CloudDocs::root": + node_name = "root" + # if no name still, return unknown string. + if not node_name: + node_name = "" + if "extension" in self.data: - return "{}.{}".format(self.data["name"], self.data["extension"]) - return self.data["name"] + return "{}.{}".format(node_name, self.data["extension"]) + return node_name @property def type(self): """Gets the node type.""" node_type = self.data.get("type") + # handle trash which has no node type + if not node_type and self.data.get("drivewsid") == "TRASH_ROOT": + node_type = "trash" return node_type and node_type.lower() def get_children(self): @@ -330,6 +413,27 @@ def delete(self): self.data["drivewsid"], self.data["etag"] ) + def recover(self): + """Recovers an iCloud Drive item from trash.""" + # check to ensure item is in the trash - it should have a "restorePath" property + if self.data.get("restorePath"): + return self.connection.recover_items_from_trash( + self.data["drivewsid"], self.data["etag"] + ) + else: + raise ValueError(f"'{self.name}' does not appear to be in the Trash.") + + def delete_forever(self): + """Permanently deletes an iCloud Drive item from trash.""" + # check to ensure item is in the trash - it should have a "restorePath" property + if self.data.get("restorePath"): + return self.connection.delete_forever_from_trash( + self.data["drivewsid"], self.data["etag"] + ) + else: + raise ValueError(f"'{self.name}' does not appear to be in the Trash. Please 'delete()' it first before " + f"trying to 'delete_forever()'.") + def get(self, name): """Gets the node child.""" if self.type == "file": diff --git a/tests/__init__.py b/tests/__init__.py index 8008eafe..a9427d79 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -32,6 +32,9 @@ DRIVE_SUBFOLDER_WORKING, DRIVE_ROOT_WORKING, DRIVE_FILE_DOWNLOAD_WORKING, + DRIVE_TRASH_WORKING, + DRIVE_TRASH_RECOVER_WORKING, + DRIVE_TRASH_DELETE_FOREVER_WORKING ) from .const_findmyiphone import FMI_FAMILY_WORKING @@ -133,6 +136,8 @@ def request(self, method, url, **kwargs): return ResponseMock(DRIVE_ROOT_WORKING) if data[0].get("drivewsid") == "FOLDER::com.apple.CloudDocs::documents": return ResponseMock(DRIVE_ROOT_INVALID) + if data[0].get("drivewsid") == "FOLDER::com.apple.CloudDocs::TRASH_ROOT": + return ResponseMock(DRIVE_TRASH_WORKING) if ( data[0].get("drivewsid") == "FOLDER::com.apple.CloudDocs::1C7F1760-D940-480F-8C4F-005824A4E05B" @@ -143,6 +148,27 @@ def request(self, method, url, **kwargs): == "FOLDER::com.apple.CloudDocs::D5AA0425-E84F-4501-AF5D-60F1D92648CF" ): return ResponseMock(DRIVE_SUBFOLDER_WORKING) + + # Drive Trash Recover + if ( + "putBackItemsFromTrash" in url + and method == "POST" + and data.get("items")[0].get("drivewsid") + ): + if (data.get("items")[0].get("drivewsid") == + "FOLDER::com.apple.CloudDocs::2BF8600B-5DCC-4421-805A-1C28D07197D5"): + return ResponseMock(DRIVE_TRASH_RECOVER_WORKING) + + # Drive Trash Delete Forever + if ( + "deleteItems" in url + and method == "POST" + and data.get("items")[0].get("drivewsid") + ): + if (data.get("items")[0].get("drivewsid") == + "FOLDER::com.apple.CloudDocs::478AEA23-42A2-468A-ABC1-1A04BC07F738"): + return ResponseMock(DRIVE_TRASH_DELETE_FOREVER_WORKING) + # Drive download if "com.apple.CloudDocs/download/by_id" in url and method == "GET": if params.get("document_id") == "516C896C-6AA5-4A30-B30E-5502C2333DAE": diff --git a/tests/const_drive.py b/tests/const_drive.py index 9987c1b6..ea972eaa 100644 --- a/tests/const_drive.py +++ b/tests/const_drive.py @@ -673,3 +673,170 @@ }, "double_etag": "32::2x", } + +DRIVE_TRASH_WORKING = [ + { + "items": [ + { + "dateCreated": "2022-06-23T20:58:35Z", + "drivewsid": "FILE::com.apple.CloudDocs::C2AD01E4-E625-47FE-AE83-4DF311A05A48", + "docwsid": "C2AD01E4-E625-47FE-AE83-4DF311A05A48", + "zone": "com.apple.CloudDocs", + "name": "dead-file", + "extension": "download", + "parentId": "TRASH_ROOT", + "dateExpiration": "2024-12-12T02:17:55Z", + "isChainedToParent": True, + "dateModified": "2022-06-23T20:43:02Z", + "dateChanged": "2024-11-12T02:17:55Z", + "size": 11364977, + "etag": "o72::o6y", + "restorePath": "Downloads/dead-file.download", + "lastOpenTime": "2024-11-12T02:15:18Z", + "type": "FILE" + }, + { + "dateCreated": "2024-11-12T04:41:18Z", + "drivewsid": "FOLDER::com.apple.CloudDocs::31102B37-D62F-4322-862C-EDE2030C8AFA", + "docwsid": "31102B37-D62F-4322-862C-EDE2030C8AFA", + "zone": "com.apple.CloudDocs", + "name": "test_create_folder", + "parentId": "TRASH_ROOT", + "dateExpiration": "2024-12-12T04:48:22Z", + "isChainedToParent": True, + "restorePath": "test_create_folder", + "etag": "o96", + "type": "FOLDER", + "assetQuota": 0, + "fileCount": 0, + "shareCount": 0, + "shareAliasCount": 0, + "directChildrenCount": 0 + }, + { + "dateCreated": "2024-11-12T04:18:13Z", + "drivewsid": "FOLDER::com.apple.CloudDocs::478AEA23-42A2-468A-ABC1-1A04BC07F738", + "docwsid": "478AEA23-42A2-468A-ABC1-1A04BC07F738", + "zone": "com.apple.CloudDocs", + "name": "test_delete_forever_and_ever", + "parentId": "TRASH_ROOT", + "dateExpiration": "2024-12-12T04:18:20Z", + "isChainedToParent": True, + "restorePath": "test_delete_forever_and_ever", + "etag": "o8h", + "type": "FOLDER", + "assetQuota": 0, + "fileCount": 0, + "shareCount": 0, + "shareAliasCount": 0, + "directChildrenCount": 0 + }, + { + "dateCreated": "2024-11-12T03:41:18Z", + "drivewsid": "FOLDER::com.apple.CloudDocs::E63A9193-4428-4AE1-A334-83B880C75379", + "docwsid": "E63A9193-4428-4AE1-A334-83B880C75379", + "zone": "com.apple.CloudDocs", + "name": "test_files_1", + "parentId": "TRASH_ROOT", + "dateExpiration": "2024-12-12T03:42:07Z", + "isChainedToParent": True, + "restorePath": "test_files_1", + "etag": "o7s", + "type": "FOLDER", + "assetQuota": 7, + "fileCount": 1, + "shareCount": 0, + "shareAliasCount": 0, + "directChildrenCount": 1 + }, + { + "dateCreated": "2024-11-12T03:37:13Z", + "drivewsid": "FOLDER::com.apple.CloudDocs::2BF8600B-5DCC-4421-805A-1C28D07197D5", + "docwsid": "2BF8600B-5DCC-4421-805A-1C28D07197D5", + "zone": "com.apple.CloudDocs", + "name": "test_random_uuid", + "parentId": "TRASH_ROOT", + "dateExpiration": "2024-12-12T03:57:30Z", + "isChainedToParent": True, + "restorePath": "test_random_uuid", + "etag": "o9a", + "type": "FOLDER", + "assetQuota": 0, + "fileCount": 0, + "shareCount": 0, + "shareAliasCount": 0, + "directChildrenCount": 0 + }, + { + "dateCreated": "2024-11-12T04:25:27Z", + "drivewsid": "FOLDER::com.apple.CloudDocs::B9B90B8D-CCC2-4BDB-A58D-289F746C3478", + "docwsid": "B9B90B8D-CCC2-4BDB-A58D-289F746C3478", + "zone": "com.apple.CloudDocs", + "name": "test12345", + "parentId": "TRASH_ROOT", + "dateExpiration": "2024-12-12T04:31:46Z", + "isChainedToParent": True, + "restorePath": "test12345", + "etag": "o8y", + "type": "FOLDER", + "assetQuota": 0, + "fileCount": 0, + "shareCount": 0, + "shareAliasCount": 0, + "directChildrenCount": 0 + } + ], + "numberOfItems": 6, + "drivewsid": "TRASH_ROOT" + } +] + +DRIVE_TRASH_RECOVER_WORKING = { + "items": [ + { + "dateCreated": "2024-11-12T03:37:13Z", + "drivewsid": "FOLDER::com.apple.CloudDocs::2BF8600B-5DCC-4421-805A-1C28D07197D5", + "docwsid": "2BF8600B-5DCC-4421-805A-1C28D07197D5", + "zone": "com.apple.CloudDocs", + "name": "test_random_uuid", + "parentId": "FOLDER::com.apple.CloudDocs::root", + "isChainedToParent": True, + "item_id": "CJC_vaYFEAAiEH8Y2nkmm0bfntz-AmIQWC4", + "etag": "o9g", + "type": "FOLDER", + "assetQuota": 0, + "fileCount": 0, + "shareCount": 0, + "shareAliasCount": 0, + "directChildrenCount": 0, + "status": "OK" + } + ] +} + +DRIVE_TRASH_DELETE_FOREVER_WORKING = { + "items": [ + { + "dateCreated": "2024-11-12T04:18:14Z", + "drivewsid": "FOLDER::com.apple.CloudDocs::478AEA23-42A2-468A-ABC1-1A04BC07F738", + "docwsid": "478AEA23-42A2-468A-ABC1-1A04BC07F738", + "zone": "com.apple.CloudDocs", + "name": "test_delete_forever_and_ever", + "isDeleted": True, + "parentId": "FOLDER::com.apple.CloudDocs::43D7C666-6E6E-4522-8999-0B519C3A1F4B", + "dateExpiration": "2024-12-12T04:18:20Z", + "isChainedToParent": True, + "item_id": "CJqQty4QACIQjiS90WklSeGExLvHPWWruzgB", + "restorePath": "test_delete_forever_and_ever", + "etag": "null", + "type": "FOLDER", + "assetQuota": 0, + "fileCount": 0, + "shareCount": 0, + "shareAliasCount": 0, + "directChildrenCount": 0, + "status": "OK" + } + ] +} + diff --git a/tests/test_drive.py b/tests/test_drive.py index 9158e5d1..736eca7e 100644 --- a/tests/test_drive.py +++ b/tests/test_drive.py @@ -19,7 +19,8 @@ def setUp(self): def test_root(self): """Test the root folder.""" drive = self.service.drive - assert drive.name == "" + # root name is now extracted from drivewsid. + assert drive.name == "root" assert drive.type == "folder" assert drive.size is None assert drive.date_changed is None @@ -27,6 +28,34 @@ def test_root(self): assert drive.date_last_open is None assert drive.dir() == ["Keynote", "Numbers", "Pages", "Preview", "pyiCloud"] + def test_trash(self): + """Test the root folder.""" + trash = self.service.drive.trash + assert trash.name == "TRASH_ROOT" + assert trash.type == "trash" + assert trash.size is None + assert trash.date_changed is None + assert trash.date_modified is None + assert trash.date_last_open is None + assert trash.dir() == ["dead-file.download", "test_create_folder", "test_delete_forever_and_ever", + "test_files_1", "test_random_uuid", "test12345"] + + def test_trash_recover(self): + """Test recovering a file from the Trash.""" + recover_result = self.service.drive.trash["test_random_uuid"].recover() + recover_result_items = recover_result["items"][0] + assert recover_result_items["status"] == "OK" + assert recover_result_items["parentId"] == "FOLDER::com.apple.CloudDocs::root" + assert recover_result_items["name"] == "test_random_uuid" + + def test_trash_delete_forever(self): + """Test permanently deleting a file from the Trash.""" + recover_result = self.service.drive.trash["test_delete_forever_and_ever"].delete_forever() + recover_result_items = recover_result["items"][0] + assert recover_result_items["status"] == "OK" + assert recover_result_items["parentId"] == "FOLDER::com.apple.CloudDocs::43D7C666-6E6E-4522-8999-0B519C3A1F4B" + assert recover_result_items["name"] == "test_delete_forever_and_ever" + def test_folder_app(self): """Test the /Preview folder.""" folder = self.service.drive["Preview"] From 1c528742fabbed36d7ddb7548b8fa65efaafc566 Mon Sep 17 00:00:00 2001 From: Aaron Edwards Date: Tue, 12 Nov 2024 22:41:15 -0800 Subject: [PATCH 2/2] Quick README.rst fix for missing `refresh_trash()` --- README.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.rst b/README.rst index ab25f6bf..171584bf 100644 --- a/README.rst +++ b/README.rst @@ -369,6 +369,7 @@ You can also interact with files in the ``trash``: ['DSC08116.JPG'] >>> delete_output = api.drive['Holiday Photos']['2013']['Sicily']['DSC08117.JPG'].delete() + >>> api.drive.refresh_trash() >>> api.drive.trash.dir() ['DSC08116.JPG', 'DSC08117.JPG'] @@ -387,6 +388,7 @@ You can interact with the ``trash`` similar to a standard directory, with some r ['DSC08117.JPG'] >>> purge_output = api.drive.trash['DSC08117.JPG'].delete_forever() + >>> api.drive.refresh_trash() >>> api.drive.trash.dir() []