Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for public links #241

Merged
merged 5 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions owncloud/owncloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,29 @@ def logout(self):
self._session.close()
return True

def anon_login(self, folder_token, folder_password=''):
self._session = requests.session()
self._session.verify = self._verify_certs
self._session.auth = (folder_token, folder_password)

url_components = parse.urlparse(self.url)
self._davpath = url_components.path + 'public.php/webdav'
self._webdav_url = self.url + 'public.php/webdav'

@classmethod
def from_public_link(cls, public_link, folder_password='', **kwargs):
public_link_components = parse.urlparse(public_link)
url = public_link_components.scheme + '://' + public_link_components.hostname
folder_token = public_link_components.path.split('/')[-1]
anon_session = cls(url, **kwargs)
anon_session.anon_login(folder_token, folder_password=folder_password)
return anon_session

def drop_file(self, file_name):
""" Convenience wrapper for put_file """
destination = '/' + os.path.basename(file_name)
return self.put_file(destination, file_name)

def file_info(self, path):
"""Returns the file info for the given remote file

Expand Down
259 changes: 258 additions & 1 deletion owncloud/test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __create_file(target_file, size):
dummy_data += 'X'

if (len(dummy_data) > 0):
file_handle.write(dummy_data);
file_handle.write(dummy_data)

file_handle.close()

Expand Down Expand Up @@ -1214,5 +1214,262 @@ def test_make_request_fail_unaccepted_code(self):
def tearDown(self):
self.client.logout()

class TestPublicFolder(unittest.TestCase):

def get_dav_endpoint_version(self):
return 0

def files():
return (
['test.txt'],
['test space and + and #.txt'],
[u'文件.txt']
)

def files_content():
return (
['test.txt', b'Hello world!', 'subdir'],
['test space and + and #.txt', b'Hello space with+plus#hash!', 'subdir with space + plus and #hash'],
[u'文件.txt', u'你好世界'.encode('utf-8'), u'文件夹']
)


def setUp(self):
self.temp_dir = tempfile.gettempdir() + '/pyocclient_test%s-%s/' % (int(time.time()), random.randint(1, 1000))
os.mkdir(self.temp_dir)

self.client = owncloud.Client(Config['owncloud_url'], dav_endpoint_version=self.get_dav_endpoint_version())
self.client.login(Config['owncloud_login'], Config['owncloud_password'])
self.test_root = Config['test_root']
if not self.test_root[-1] == '/':
self.test_root += '/'
if not self.test_root[0] == '/':
self.test_root = '/' + self.test_root

# setting up the root dir
self.client.mkdir(self.test_root)
self.download_testfile = 'download_pyoctest.dat'
self.upload_testfile = 'upload_pyoctest.dat'
self.folder_password = 'secret'
temp_file = self.temp_dir + self.download_testfile

# create pathing for different scenarios
self.public_folder_download = self.test_root + 'public_share_download/' # file download only
self.public_folder_upload = self.test_root + 'public_share_upload/' # file upload only
self.pw_public_folder_download = self.test_root + 'pw_public_share_download/' # password protected download
self.pw_public_folder_upload = self.test_root + 'pw_public_share_upload/' # password protected upload

# make folder with public download only
self.client.mkdir(self.public_folder_download)
public_share_info = self.client.share_file_with_link(self.public_folder_download, perms=self.client.OCS_PERMISSION_READ)
self.public_download_link = public_share_info.get_link()
self.public_download_token = public_share_info.get_token()

# populate folder with data
temp_file = self.temp_dir + self.download_testfile
self.__create_file(temp_file, 2 * 1024)
self.client.put_file(self.public_folder_download + self.download_testfile, temp_file, chunked=False)

# make folder with public upload only
self.client.mkdir(self.public_folder_upload)
public_share_info = self.client.share_file_with_link(self.public_folder_upload, perms=self.client.OCS_PERMISSION_CREATE)
self.public_upload_link = public_share_info.get_link()
self.public_upload_token = public_share_info.get_token()

# make password protected folder with public download only
self.client.mkdir(self.pw_public_folder_download)
public_share_info = self.client.share_file_with_link(self.pw_public_folder_download, perms=self.client.OCS_PERMISSION_READ, password=self.folder_password)
self.pw_public_download_link = public_share_info.get_link()
self.pw_public_download_token = public_share_info.get_token()

# populate password protected folder with data
temp_file = self.temp_dir + self.download_testfile
self.__create_file(temp_file, 2 * 1024)
self.client.put_file(self.pw_public_folder_download + self.download_testfile, temp_file, chunked=False)

# make password protected folder with public upload only
self.client.mkdir(self.pw_public_folder_upload)
public_share_info = self.client.share_file_with_link(self.pw_public_folder_upload, perms=self.client.OCS_PERMISSION_CREATE, password=self.folder_password)
self.pw_public_upload_link = public_share_info.get_link()
self.pw_public_upload_token = public_share_info.get_token()

def tearDown(self):
self.client.delete(self.test_root)
self.client.logout()
self.anon_client.logout()
shutil.rmtree(self.temp_dir)

@staticmethod
def __create_file(target_file, size):
file_handle = open(target_file, 'w')
dummy_data = ''
for i in range(0, 1024):
dummy_data += 'X'

for i in range(0, int(size / 1024)):
# write in 1kb blocks
file_handle.write(dummy_data)

dummy_data = ''
for i in range(0, size % 1024):
dummy_data += 'X'

if (len(dummy_data) > 0):
file_handle.write(dummy_data)

file_handle.close()

def test_anon_login(self):
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.public_download_token)
listing = self.anon_client.list('/')
self.assertEqual(len(listing), 1)

# repeat with wrong token
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login('badtoken')
with self.assertRaises(owncloud.ResponseError) as e:
self.anon_client.list('/')
self.assertEqual(e.exception.status_code, 401)

# repeat with upload only folder (listing not allowed)
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.public_upload_token)
with self.assertRaises(owncloud.ResponseError) as e:
self.anon_client.list('/')
self.assertEqual(e.exception.status_code, 404)

def test_anon_login_pw(self):
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.pw_public_download_token, folder_password=self.folder_password)
listing = self.anon_client.list('/')
self.assertEqual(len(listing), 1)

# repeat with wrong password
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.pw_public_download_token, folder_password='wrongpassword')
with self.assertRaises(owncloud.ResponseError) as e:
self.anon_client.list('/')
self.assertEqual(e.exception.status_code, 401)

# repeat with upload only folder (listing not allowed)
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.pw_public_upload_token, folder_password=self.folder_password)
with self.assertRaises(owncloud.ResponseError) as e:
self.anon_client.list('/')
self.assertEqual(e.exception.status_code, 404)

def test_from_link(self):
self.anon_client = owncloud.Client.from_public_link(self.public_download_link)
listing = self.anon_client.list('/')
self.assertEqual(len(listing), 1)

# repeat with wrong link
self.anon_client = owncloud.Client.from_public_link(Config['owncloud_url'] + 'index.php/s/wronglink')
with self.assertRaises(owncloud.ResponseError) as e:
self.anon_client.list('/')
self.assertEqual(e.exception.status_code, 401)

# repeat with upload only folder (listing not allowed)
self.anon_client = owncloud.Client.from_public_link(self.public_upload_link)
with self.assertRaises(owncloud.ResponseError) as e:
self.anon_client.list('/')
self.assertEqual(e.exception.status_code, 404)

def test_from_link_pw(self):
self.anon_client = owncloud.Client.from_public_link(self.pw_public_download_link, folder_password=self.folder_password)
listing = self.anon_client.list('/')
self.assertEqual(len(listing), 1)

# repeat with wrong password
self.anon_client = owncloud.Client.from_public_link(self.pw_public_download_link, folder_password='wrongpassword')
with self.assertRaises(owncloud.ResponseError) as e:
self.anon_client.list('/')
self.assertEqual(e.exception.status_code, 401)

# repeat with upload only folder (listing not allowed)
self.anon_client = owncloud.Client.from_public_link(self.pw_public_upload_link, folder_password=self.folder_password)
with self.assertRaises(owncloud.ResponseError) as e:
self.anon_client.list('/')
self.assertEqual(e.exception.status_code, 404)


@data_provider(files_content)
def test_download_file(self, file_name, content, subdir):
"""Test file download"""
# populate folder with data
temp_file = self.temp_dir + file_name
self.client.put_file_contents(self.public_folder_download + '/' + file_name, content)

self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.public_download_token)

self.assertTrue(self.anon_client.get_file('/' + file_name, temp_file))

f = open(temp_file, 'rb')
s = f.read()
f.close()
os.unlink(temp_file)
self.assertEqual(s, content)

# ZIP Download requires new method for anon session. API URL differs from regular session.
# def test_download_dir(self, file_name, content, subdir):

@data_provider(files_content)
def test_mkdir(self, file_name, content, subdir):
"""Test subdirectory creation"""
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.public_upload_token)

self.assertTrue(self.anon_client.mkdir(subdir))
# Only users with account can get file info in upload folders
self.assertIsNotNone(self.client.file_info(self.public_folder_upload + subdir))

@data_provider(files)
def test_upload_small_file(self, file_name):
"""Test simple upload"""
temp_file = self.temp_dir + 'pyoctest.dat'
self.__create_file(temp_file, 2 * 1024)
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.public_upload_token)

self.assertTrue(self.anon_client.put_file('/' + file_name, temp_file, chunked=False))
os.unlink(temp_file)

file_info = self.client.file_info(self.public_folder_upload + file_name)
self.assertIsNotNone(file_info)
self.assertEqual(file_info.get_size(), 2 * 1024)

@data_provider(files)
def test_upload_big_file(self, file_name):
"""Test chunked upload"""
temp_file = self.temp_dir + 'pyoctest.dat'
self.__create_file(temp_file, 10 * 1024 * 1024)

self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.public_upload_token)

self.assertTrue(self.anon_client.put_file('/' + file_name, temp_file))
os.unlink(temp_file)

file_info = self.client.file_info(self.public_folder_upload + file_name)
self.assertIsNotNone(file_info)
self.assertEqual(file_info.get_size(), 10 * 1024 * 1024)

@data_provider(files)
def test_drop_file(self, file_name):
"""Test convinience wrapper"""
temp_file = self.temp_dir + file_name
self.__create_file(temp_file, 2 * 1024)
self.anon_client = owncloud.Client(Config['owncloud_url'])
self.anon_client.anon_login(self.public_upload_token)

self.assertTrue(self.anon_client.drop_file(temp_file))
os.unlink(temp_file)

file_info = self.client.file_info(self.public_folder_upload + file_name)
self.assertIsNotNone(file_info)
self.assertEqual(file_info.get_size(), 2 * 1024)

if __name__ == '__main__':
unittest.main()