Skip to content

Commit

Permalink
Merge branch 'feature/zdx' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchos committed Jun 2, 2023
2 parents 584fe2f + 139b578 commit 81f29f5
Show file tree
Hide file tree
Showing 21 changed files with 1,974 additions and 1 deletion.
15 changes: 14 additions & 1 deletion docsrc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
zs/zia/index
zs/zpa/index
zs/zcc/index
zs/zdx/index

pyZscaler SDK - Library Reference
=====================================================================
Expand Down Expand Up @@ -41,7 +42,7 @@ Products
- :doc:`Zscaler Private Access (ZPA) <zs/zpa/index>`
- :doc:`Zscaler Internet Access (ZIA) <zs/zia/index>`
- :doc:`Zscaler Mobile Admin Portal <zs/zcc/index>`
- Cloud Security Posture Management (CSPM) - (work in progress)
- :doc:`Zscaler Digital Experience (ZDX) <zs/zdx/index>`

Installation
==============
Expand Down Expand Up @@ -98,6 +99,18 @@ Quick ZCC Example
for device in zcc.devices.list_devices():
pprint(device)
Quick ZDX Example
^^^^^^^^^^^^^^^^^^^
.. code-block:: python
from pyzscaler import ZDX
from pprint import pprint
zcc = ZDX(client_id='CLIENT_ID', client_secret='CLIENT_SECRET')
for device in zdx.devices.list_devices():
pprint(device)
.. automodule:: pyzscaler
:members:
Expand Down
12 changes: 12 additions & 0 deletions docsrc/zs/zdx/admin.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
admin
------

The following methods allow for interaction with the ZDX
Admin API endpoints.

Methods are accessible via ``zdx.admin``

.. _zdx-admin:

.. automodule:: pyzscaler.zdx.admin
:members:
12 changes: 12 additions & 0 deletions docsrc/zs/zdx/apps.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apps
------

The following methods allow for interaction with the ZDX
Application API endpoints.

Methods are accessible via ``zdx.apps``

.. _zdx-apps:

.. automodule:: pyzscaler.zdx.apps
:members:
12 changes: 12 additions & 0 deletions docsrc/zs/zdx/devices.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
devices
-------

The following methods allow for interaction with the ZDX
Devices API endpoints.

Methods are accessible via ``zdx.devices``

.. _zdx-devices:

.. automodule:: pyzscaler.zdx.devices
:members:
13 changes: 13 additions & 0 deletions docsrc/zs/zdx/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
ZDX
==========
This package covers the ZDX interface.

.. toctree::
:maxdepth: 1
:glob:
:hidden:

*

.. automodule:: pyzscaler.zdx
:members:
12 changes: 12 additions & 0 deletions docsrc/zs/zdx/session.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
session
-------

The following methods allow for interaction with the ZDX
Session API endpoints.

Methods are accessible via ``zdx.session``

.. _zdx-session:

.. automodule:: pyzscaler.zdx.session
:members:
1 change: 1 addition & 0 deletions pyzscaler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
__version__ = "1.4.1"

from pyzscaler.zcc import ZCC # noqa
from pyzscaler.zdx import ZDX # noqa
from pyzscaler.zia import ZIA # noqa
from pyzscaler.zpa import ZPA # noqa
85 changes: 85 additions & 0 deletions pyzscaler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,51 @@ def _get_page(self) -> None:
time.sleep(1)


class ZDXIterator(APIIterator):
"""
Iterator class for ZDX endpoints.
"""

def __init__(self, api, endpoint, limit=None, **kwargs):
super().__init__(api, **kwargs)
self.endpoint = endpoint
self.limit = limit
self.next_offset = None
self.total = 0

# Load the first page
self._get_page()

def __next__(self):
try:
item = super().__next__()
except StopIteration:
if self.next_offset is None:
# There is no next page, so we're done iterating
raise
# There is another page, so get it and continue iterating
self._get_page()
item = super().__next__()
return item

def _get_page(self):
params = {"limit": self.limit, "offset": self.next_offset} if self.next_offset else {}

# Request the next page
response = self._api.get(self.endpoint, params=params)

# Extract the next offset and the data items from the response
self.next_offset = response.get("next_offset")
self.page = response["users"]

# Update the total number of records
self.total += len(self.page)

# Reset page_count for the new page
self.page_count = 0


# Maps ZCC numeric os_type and registration_type arguments to a human-readable string
zcc_param_map = {
"os": {
Expand All @@ -151,3 +196,43 @@ def _get_page(self) -> None:
"quarantined": 6,
},
}


def calculate_epoch(hours: int):
current_time = int(time.time())
past_time = int(current_time - (hours * 3600))
return current_time, past_time


def zdx_params(func):
"""
Decorator to add custom parameter functionality for ZDX API calls.
Args:
func: The function to decorate.
Returns:
The decorated function.
"""

def wrapper(self, *args, **kwargs):
since = kwargs.pop("since", None)
search = kwargs.pop("search", None)
location_id = kwargs.pop("location_id", None)
department_id = kwargs.pop("department_id", None)
geo_id = kwargs.pop("geo_id", None)

if since:
current_time, past_time = calculate_epoch(since)
kwargs["to"] = current_time
kwargs["from"] = past_time

kwargs["q"] = search or kwargs.get("q")
kwargs["loc"] = location_id or kwargs.get("loc")
kwargs["dept"] = department_id or kwargs.get("dept")
kwargs["geo"] = geo_id or kwargs.get("geo")

return func(self, *args, **kwargs)

return wrapper
84 changes: 84 additions & 0 deletions pyzscaler/zdx/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import os

from box import Box
from restfly.session import APISession

from pyzscaler import __version__
from .admin import AdminAPI
from .apps import AppsAPI
from .devices import DevicesAPI
from .session import SessionAPI
from .users import UsersAPI


class ZDX(APISession):
"""
A Controller to access Endpoints in the Zscaler Digital Experience (ZDX) API.
The ZDX object stores the session token and simplifies access to CRUD options within the ZDX Portal.
Attributes:
client_id (str): The ZDX Client ID generated from the ZDX Portal.
client_secret (str): The ZDX Client Secret generated from the ZDX Portal.
cloud (str): The Zscaler cloud for your tenancy, accepted values are below. Defaults to ``zdxcloud``.
* ``zdxcloud``
* ``zdxbeta``
override_url (str):
If supplied, this attribute can be used to override the production URL that is derived
from supplying the `cloud` attribute. Use this attribute if you have a non-standard tenant URL
(e.g. internal test instance etc). When using this attribute, there is no need to supply the `cloud`
attribute. The override URL will be prepended to the API endpoint suffixes. The protocol must be included
i.e. http:// or https://.
"""

_vendor = "Zscaler"
_product = "pyZscaler"
_backoff = 3
_build = __version__
_box = True
_box_attrs = {"camel_killer_box": True}
_env_base = "ZDX"
_env_cloud = "zdxcloud"
_url = "https://api.zdxcloud.net/v1"

def __init__(self, **kw):
self._client_id = kw.get("client_id", os.getenv(f"{self._env_base}_CLIENT_ID"))
self._client_secret = kw.get("client_secret", os.getenv(f"{self._env_base}_CLIENT_SECRET"))
self._cloud = kw.get("cloud", os.getenv(f"{self._env_base}_CLOUD", self._env_cloud))
self._url = kw.get("override_url", os.getenv(f"{self._env_base}_OVERRIDE_URL")) or f"https://api.{self._cloud}.net/v1"
self.conv_box = True
super(ZDX, self).__init__(**kw)

def _build_session(self, **kwargs) -> Box:
"""Creates a ZCC API session."""
super(ZDX, self)._build_session(**kwargs)
self._auth_token = self.session.create_token(client_id=self._client_id, client_secret=self._client_secret).token
return self._session.headers.update({"Authorization": f"Bearer {self._auth_token}"})

@property
def session(self):
"""The interface object for the :ref:`ZDX Session interface <zdx-session>`."""
return SessionAPI(self)

@property
def admin(self):
"""The interface object for the :ref:`ZDX Admin interface <zdx-admin>`."""
return AdminAPI(self)

@property
def apps(self):
"""The interface object for the :ref:`ZDX Apps interface <zdx-apps>`."""
return AppsAPI(self)

@property
def devices(self):
"""The interface object for the :ref:`ZDX Devices interface <zdx-devices>`."""
return DevicesAPI(self)

@property
def users(self):
"""The interface object for the :ref:`ZDX Users interface <zdx-users>`."""
return UsersAPI(self)
69 changes: 69 additions & 0 deletions pyzscaler/zdx/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from box import BoxList
from restfly.endpoint import APIEndpoint

from pyzscaler.utils import zdx_params


class AdminAPI(APIEndpoint):
@zdx_params
def list_departments(self, **kwargs) -> BoxList:
"""
Returns a list of departments that are configured within ZDX.
Keyword Args:
since (int): The number of hours to look back for devices.
search (str): The search string to filter by name or department ID.
Returns:
:obj:`BoxList`: The list of departments in ZDX.
Examples:
List all departments in ZDX for the past 2 hours
>>> for department in zdx.admin.list_departments():
... print(department)
"""

return self._get("administration/departments", params=kwargs)

@zdx_params
def list_locations(self, **kwargs) -> BoxList:
"""
Returns a list of locations that are configured within ZDX.
Keyword Args:
since (int): The number of hours to look back for devices.
search (str): The search string to filter by name or location ID.
Returns:
:obj:`BoxList`: The list of locations in ZDX.
Examples:
List all locations in ZDX for the past 2 hours
>>> for location in zdx.admin.list_locations():
... print(location)
"""
return self._get("administration/locations", params=kwargs)

@zdx_params
def list_geolocations(self, **kwargs) -> BoxList:
"""
Returns a list of all active geolocations configured within the ZDX tenant.
Keyword Args:
since (int): The number of hours to look back for devices.
location_id (str): The unique ID for the location.
parent_geo_id (str): The unique ID for the parent geolocation.
search (str): The search string to filter by name.
Returns:
:obj:`BoxList`: The list of geolocations in ZDX.
Examples:
List all geolocations in ZDX for the past 2 hours
>>> for geolocation in zdx.admin.list_geolocations():
... print(geolocation)
"""
return self._get("active_geo", params=kwargs)
Loading

0 comments on commit 81f29f5

Please sign in to comment.