diff --git a/owncloud/owncloud.py b/owncloud/owncloud.py index 9c7f343..4277b58 100644 --- a/owncloud/owncloud.py +++ b/owncloud/owncloud.py @@ -384,6 +384,29 @@ def logout(self): self._session.close() return True + def anon_login(self, folder_token, folder_password=''): + self._session = requests.session() + self._session.verify = self._verify_certs + self._session.auth = (folder_token, folder_password) + + url_components = parse.urlparse(self.url) + self._davpath = url_components.path + 'public.php/webdav' + self._webdav_url = self.url + 'public.php/webdav' + + @classmethod + def from_public_link(cls, public_link, folder_password='', **kwargs): + public_link_components = parse.urlparse(public_link) + url = public_link_components.scheme + '://' + public_link_components.hostname + folder_token = public_link_components.path.split('/')[-1] + anon_session = cls(url, **kwargs) + anon_session.anon_login(folder_token, folder_password=folder_password) + return anon_session + + def drop_file(self, file_name): + """ Convenience wrapper for put_file """ + destination = '/' + os.path.basename(file_name) + return self.put_file(destination, file_name) + def file_info(self, path): """Returns the file info for the given remote file diff --git a/owncloud/test/test.py b/owncloud/test/test.py index 565a64a..f04a3dc 100644 --- a/owncloud/test/test.py +++ b/owncloud/test/test.py @@ -105,7 +105,7 @@ def __create_file(target_file, size): dummy_data += 'X' if (len(dummy_data) > 0): - file_handle.write(dummy_data); + file_handle.write(dummy_data) file_handle.close() @@ -1214,5 +1214,262 @@ def test_make_request_fail_unaccepted_code(self): def tearDown(self): self.client.logout() +class TestPublicFolder(unittest.TestCase): + + def get_dav_endpoint_version(self): + return 0 + + def files(): + return ( + ['test.txt'], + ['test space and + and #.txt'], + [u'文件.txt'] + ) + + def files_content(): + return ( + ['test.txt', b'Hello world!', 'subdir'], + ['test space and + and #.txt', b'Hello space with+plus#hash!', 'subdir with space + plus and #hash'], + [u'文件.txt', u'你好世界'.encode('utf-8'), u'文件夹'] + ) + + + def setUp(self): + self.temp_dir = tempfile.gettempdir() + '/pyocclient_test%s-%s/' % (int(time.time()), random.randint(1, 1000)) + os.mkdir(self.temp_dir) + + self.client = owncloud.Client(Config['owncloud_url'], dav_endpoint_version=self.get_dav_endpoint_version()) + self.client.login(Config['owncloud_login'], Config['owncloud_password']) + self.test_root = Config['test_root'] + if not self.test_root[-1] == '/': + self.test_root += '/' + if not self.test_root[0] == '/': + self.test_root = '/' + self.test_root + + # setting up the root dir + self.client.mkdir(self.test_root) + self.download_testfile = 'download_pyoctest.dat' + self.upload_testfile = 'upload_pyoctest.dat' + self.folder_password = 'secret' + temp_file = self.temp_dir + self.download_testfile + + # create pathing for different scenarios + self.public_folder_download = self.test_root + 'public_share_download/' # file download only + self.public_folder_upload = self.test_root + 'public_share_upload/' # file upload only + self.pw_public_folder_download = self.test_root + 'pw_public_share_download/' # password protected download + self.pw_public_folder_upload = self.test_root + 'pw_public_share_upload/' # password protected upload + + # make folder with public download only + self.client.mkdir(self.public_folder_download) + public_share_info = self.client.share_file_with_link(self.public_folder_download, perms=self.client.OCS_PERMISSION_READ) + self.public_download_link = public_share_info.get_link() + self.public_download_token = public_share_info.get_token() + + # populate folder with data + temp_file = self.temp_dir + self.download_testfile + self.__create_file(temp_file, 2 * 1024) + self.client.put_file(self.public_folder_download + self.download_testfile, temp_file, chunked=False) + + # make folder with public upload only + self.client.mkdir(self.public_folder_upload) + public_share_info = self.client.share_file_with_link(self.public_folder_upload, perms=self.client.OCS_PERMISSION_CREATE) + self.public_upload_link = public_share_info.get_link() + self.public_upload_token = public_share_info.get_token() + + # make password protected folder with public download only + self.client.mkdir(self.pw_public_folder_download) + public_share_info = self.client.share_file_with_link(self.pw_public_folder_download, perms=self.client.OCS_PERMISSION_READ, password=self.folder_password) + self.pw_public_download_link = public_share_info.get_link() + self.pw_public_download_token = public_share_info.get_token() + + # populate password protected folder with data + temp_file = self.temp_dir + self.download_testfile + self.__create_file(temp_file, 2 * 1024) + self.client.put_file(self.pw_public_folder_download + self.download_testfile, temp_file, chunked=False) + + # make password protected folder with public upload only + self.client.mkdir(self.pw_public_folder_upload) + public_share_info = self.client.share_file_with_link(self.pw_public_folder_upload, perms=self.client.OCS_PERMISSION_CREATE, password=self.folder_password) + self.pw_public_upload_link = public_share_info.get_link() + self.pw_public_upload_token = public_share_info.get_token() + + def tearDown(self): + self.client.delete(self.test_root) + self.client.logout() + self.anon_client.logout() + shutil.rmtree(self.temp_dir) + + @staticmethod + def __create_file(target_file, size): + file_handle = open(target_file, 'w') + dummy_data = '' + for i in range(0, 1024): + dummy_data += 'X' + + for i in range(0, int(size / 1024)): + # write in 1kb blocks + file_handle.write(dummy_data) + + dummy_data = '' + for i in range(0, size % 1024): + dummy_data += 'X' + + if (len(dummy_data) > 0): + file_handle.write(dummy_data) + + file_handle.close() + + def test_anon_login(self): + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.public_download_token) + listing = self.anon_client.list('/') + self.assertEqual(len(listing), 1) + + # repeat with wrong token + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login('badtoken') + with self.assertRaises(owncloud.ResponseError) as e: + self.anon_client.list('/') + self.assertEqual(e.exception.status_code, 401) + + # repeat with upload only folder (listing not allowed) + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.public_upload_token) + with self.assertRaises(owncloud.ResponseError) as e: + self.anon_client.list('/') + self.assertEqual(e.exception.status_code, 404) + + def test_anon_login_pw(self): + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.pw_public_download_token, folder_password=self.folder_password) + listing = self.anon_client.list('/') + self.assertEqual(len(listing), 1) + + # repeat with wrong password + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.pw_public_download_token, folder_password='wrongpassword') + with self.assertRaises(owncloud.ResponseError) as e: + self.anon_client.list('/') + self.assertEqual(e.exception.status_code, 401) + + # repeat with upload only folder (listing not allowed) + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.pw_public_upload_token, folder_password=self.folder_password) + with self.assertRaises(owncloud.ResponseError) as e: + self.anon_client.list('/') + self.assertEqual(e.exception.status_code, 404) + + def test_from_link(self): + self.anon_client = owncloud.Client.from_public_link(self.public_download_link) + listing = self.anon_client.list('/') + self.assertEqual(len(listing), 1) + + # repeat with wrong link + self.anon_client = owncloud.Client.from_public_link(Config['owncloud_url'] + 'index.php/s/wronglink') + with self.assertRaises(owncloud.ResponseError) as e: + self.anon_client.list('/') + self.assertEqual(e.exception.status_code, 401) + + # repeat with upload only folder (listing not allowed) + self.anon_client = owncloud.Client.from_public_link(self.public_upload_link) + with self.assertRaises(owncloud.ResponseError) as e: + self.anon_client.list('/') + self.assertEqual(e.exception.status_code, 404) + + def test_from_link_pw(self): + self.anon_client = owncloud.Client.from_public_link(self.pw_public_download_link, folder_password=self.folder_password) + listing = self.anon_client.list('/') + self.assertEqual(len(listing), 1) + + # repeat with wrong password + self.anon_client = owncloud.Client.from_public_link(self.pw_public_download_link, folder_password='wrongpassword') + with self.assertRaises(owncloud.ResponseError) as e: + self.anon_client.list('/') + self.assertEqual(e.exception.status_code, 401) + + # repeat with upload only folder (listing not allowed) + self.anon_client = owncloud.Client.from_public_link(self.pw_public_upload_link, folder_password=self.folder_password) + with self.assertRaises(owncloud.ResponseError) as e: + self.anon_client.list('/') + self.assertEqual(e.exception.status_code, 404) + + + @data_provider(files_content) + def test_download_file(self, file_name, content, subdir): + """Test file download""" + # populate folder with data + temp_file = self.temp_dir + file_name + self.client.put_file_contents(self.public_folder_download + '/' + file_name, content) + + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.public_download_token) + + self.assertTrue(self.anon_client.get_file('/' + file_name, temp_file)) + + f = open(temp_file, 'rb') + s = f.read() + f.close() + os.unlink(temp_file) + self.assertEqual(s, content) + + # ZIP Download requires new method for anon session. API URL differs from regular session. + # def test_download_dir(self, file_name, content, subdir): + + @data_provider(files_content) + def test_mkdir(self, file_name, content, subdir): + """Test subdirectory creation""" + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.public_upload_token) + + self.assertTrue(self.anon_client.mkdir(subdir)) + # Only users with account can get file info in upload folders + self.assertIsNotNone(self.client.file_info(self.public_folder_upload + subdir)) + + @data_provider(files) + def test_upload_small_file(self, file_name): + """Test simple upload""" + temp_file = self.temp_dir + 'pyoctest.dat' + self.__create_file(temp_file, 2 * 1024) + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.public_upload_token) + + self.assertTrue(self.anon_client.put_file('/' + file_name, temp_file, chunked=False)) + os.unlink(temp_file) + + file_info = self.client.file_info(self.public_folder_upload + file_name) + self.assertIsNotNone(file_info) + self.assertEqual(file_info.get_size(), 2 * 1024) + + @data_provider(files) + def test_upload_big_file(self, file_name): + """Test chunked upload""" + temp_file = self.temp_dir + 'pyoctest.dat' + self.__create_file(temp_file, 10 * 1024 * 1024) + + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.public_upload_token) + + self.assertTrue(self.anon_client.put_file('/' + file_name, temp_file)) + os.unlink(temp_file) + + file_info = self.client.file_info(self.public_folder_upload + file_name) + self.assertIsNotNone(file_info) + self.assertEqual(file_info.get_size(), 10 * 1024 * 1024) + + @data_provider(files) + def test_drop_file(self, file_name): + """Test convinience wrapper""" + temp_file = self.temp_dir + file_name + self.__create_file(temp_file, 2 * 1024) + self.anon_client = owncloud.Client(Config['owncloud_url']) + self.anon_client.anon_login(self.public_upload_token) + + self.assertTrue(self.anon_client.drop_file(temp_file)) + os.unlink(temp_file) + + file_info = self.client.file_info(self.public_folder_upload + file_name) + self.assertIsNotNone(file_info) + self.assertEqual(file_info.get_size(), 2 * 1024) + if __name__ == '__main__': unittest.main()