Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SB/EH] comment custom endpoint doc #38516

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def __init__(
retry_backoff_max: int = 120,
network_tracing: bool = False,
http_proxy: Optional[Dict[str, Any]] = None,
transport_type: TransportType = TransportType.Amqp,
auth_timeout: int = 60,
prefetch: int = 300,
max_batch_size: int = 300,
Expand All @@ -49,6 +48,7 @@ def __init__(
self.backoff_max = retry_backoff_max
self.network_tracing = network_tracing
self.http_proxy = http_proxy
transport_type = kwargs.get("transport_type", None) or TransportType.Amqp
self.transport_type = TransportType.AmqpOverWebsocket if self.http_proxy else transport_type
self.auth_timeout = auth_timeout
self.prefetch = prefetch
Expand Down Expand Up @@ -81,7 +81,7 @@ def __init__(
if self.custom_endpoint_address.find("//") == -1:
self.custom_endpoint_address = "sb://" + self.custom_endpoint_address
endpoint = urlparse(self.custom_endpoint_address)
self.transport_type = TransportType.AmqpOverWebsocket
self.transport_type = kwargs.get("transport_type", None) or TransportType.AmqpOverWebsocket
self.custom_endpoint_hostname = endpoint.hostname
if amqp_transport.KIND == "pyamqp":
self.custom_endpoint_address += "/$servicebus/websocket"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class EventHubConsumerClient(ClientBase): # pylint: disable=client-accepts-api-
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: str or None
Expand Down Expand Up @@ -307,6 +308,7 @@ def from_connection_string(
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: str or None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class EventHubProducerClient(ClientBase): # pylint: disable=client-accepts-api-
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: Optional[str]
Expand Down Expand Up @@ -504,6 +505,7 @@ def from_connection_string(
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: Optional[str]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,15 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
# Custom Endpoint
custom_endpoint_address = kwargs.get("custom_endpoint_address")
custom_endpoint = None
custom_port = None
if custom_endpoint_address:
custom_parsed_url = urlparse(custom_endpoint_address)
custom_port = custom_parsed_url.port or WEBSOCKET_PORT
custom_endpoint = f"{custom_parsed_url.hostname}:{custom_port}{custom_parsed_url.path}"
if transport_type.value == TransportType.Amqp.value:
custom_port = custom_parsed_url.port or SECURE_PORT
custom_endpoint = f"{custom_parsed_url.hostname}"
else:
custom_port = custom_parsed_url.port or WEBSOCKET_PORT
custom_endpoint = f"{custom_parsed_url.hostname}:{custom_port}{custom_parsed_url.path}"
self._container_id = container_id or str(uuid.uuid4())
self._network_trace = network_trace
self._network_trace_params = {"amqpConnection": self._container_id, "amqpSession": "", "amqpLink": ""}
Expand All @@ -163,6 +168,7 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
credential=kwargs["sasl_credential"],
port=self._port,
custom_endpoint=custom_endpoint,
custom_port=custom_port,
socket_timeout=self._socket_timeout,
network_trace_params=self._network_trace_params,
**kwargs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,13 @@ class SSLTransport(_AbstractTransport):

def __init__(self, host, *, port=AMQPS_PORT, socket_timeout=None, ssl_opts=None, **kwargs):
self.sslopts = ssl_opts if isinstance(ssl_opts, dict) else {}
self.sslopts["server_hostname"] = host
self._custom_endpoint = kwargs.get("custom_endpoint")
self._custom_port = kwargs.get("custom_port")
self.sslopts["server_hostname"] = self._custom_endpoint or host
self._read_buffer = BytesIO()
super(SSLTransport, self).__init__(host, port=port, socket_timeout=socket_timeout, **kwargs)
super(SSLTransport, self).__init__(
self._custom_endpoint or host, port=self._custom_port or port, socket_timeout=socket_timeout, **kwargs
)

def _setup_transport(self):
"""Wrap the socket in an SSL object."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class AMQPClientAsync(AMQPClientSync):
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: str
:keyword connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
Expand Down Expand Up @@ -478,6 +479,7 @@ class SendClientAsync(SendClientSync, AMQPClientAsync):
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: str
:keyword connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
Expand Down Expand Up @@ -686,6 +688,7 @@ class ReceiveClientAsync(ReceiveClientSync, AMQPClientAsync):
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: str
:keyword connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
custom_endpoint = None
if custom_endpoint_address:
custom_parsed_url = urlparse(custom_endpoint_address)
custom_port = custom_parsed_url.port or WEBSOCKET_PORT
custom_endpoint = f"{custom_parsed_url.hostname}:{custom_port}{custom_parsed_url.path}"
if transport_type.value == TransportType.Amqp.value:
custom_port = custom_parsed_url.port or SECURE_PORT
custom_endpoint = f"{custom_parsed_url.hostname}"
else:
custom_port = custom_parsed_url.port or WEBSOCKET_PORT
custom_endpoint = f"{custom_parsed_url.hostname}:{custom_port}{custom_parsed_url.path}"
self._container_id: str = container_id or str(uuid.uuid4())
self._network_trace = network_trace
self._network_trace_params = {"amqpConnection": self._container_id, "amqpSession": "", "amqpLink": ""}
Expand All @@ -145,6 +149,7 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
credential=kwargs["sasl_credential"],
port=self._port,
custom_endpoint=custom_endpoint,
custom_port=custom_port,
socket_timeout=self._socket_timeout,
network_trace_params=self._network_trace_params,
**kwargs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ def __init__(
self.raise_on_initial_eintr = raise_on_initial_eintr
self._read_buffer = BytesIO()
self.host, self.port = to_host_port(host, port)
self.host = kwargs.get("custom_endpoint") or self.host
self.port = kwargs.get("custom_port") or self.port
self.socket_settings = socket_settings
self.socket_lock = asyncio.Lock()
self.sslopts = ssl_opts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class AMQPClient(object): # pylint: disable=too-many-instance-attributes
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: str
:keyword connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
Expand Down Expand Up @@ -561,6 +562,7 @@ class SendClient(AMQPClient):
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: str
:keyword connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
Expand Down Expand Up @@ -786,6 +788,7 @@ class ReceiveClient(AMQPClient): # pylint:disable=too-many-instance-attributes
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: str
:keyword connection_verify: Path to the custom CA_BUNDLE file of the SSL certificate which is used to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class EventHubConsumerClient(ClientBaseAsync): # pylint: disable=client-accepts
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: Optional[str]
Expand Down Expand Up @@ -319,6 +320,7 @@ def from_connection_string(
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: Optional[str]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class EventHubProducerClient(ClientBaseAsync): # pylint: disable=client-accepts
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: Optional[str]
Expand Down Expand Up @@ -471,6 +472,7 @@ def from_connection_string(
:keyword custom_endpoint_address: The custom endpoint address to use for establishing a connection to
the Event Hubs service, allowing network requests to be routed through any application gateways or
other paths needed for the host environment. Default is None.
Unless specified otherwise, default transport type is TransportType.AmqpOverWebsockets.
The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>".
If port is not specified in the `custom_endpoint_address`, by default port 443 will be used.
:paramtype custom_endpoint_address: Optional[str]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, **kwargs):
if self.custom_endpoint_address.find("//") == -1:
self.custom_endpoint_address = "sb://" + self.custom_endpoint_address
endpoint = urlparse(self.custom_endpoint_address)
self.transport_type = TransportType.AmqpOverWebsocket
self.transport_type = kwargs.get("transport_type") or TransportType.AmqpOverWebsocket
self.custom_endpoint_hostname = endpoint.hostname
if amqp_transport.KIND == "pyamqp":
self.custom_endpoint_address += "/$servicebus/websocket"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,15 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
# Custom Endpoint
custom_endpoint_address = kwargs.get("custom_endpoint_address")
custom_endpoint = None
custom_port = None
if custom_endpoint_address:
custom_parsed_url = urlparse(custom_endpoint_address)
custom_port = custom_parsed_url.port or WEBSOCKET_PORT
custom_endpoint = f"{custom_parsed_url.hostname}:{custom_port}{custom_parsed_url.path}"
if transport_type.value == TransportType.Amqp.value:
custom_port = custom_parsed_url.port or SECURE_PORT
custom_endpoint = f"{custom_parsed_url.hostname}"
else:
custom_port = custom_parsed_url.port or WEBSOCKET_PORT
custom_endpoint = f"{custom_parsed_url.hostname}:{custom_port}{custom_parsed_url.path}"
self._container_id = container_id or str(uuid.uuid4())
self._network_trace = network_trace
self._network_trace_params = {"amqpConnection": self._container_id, "amqpSession": "", "amqpLink": ""}
Expand All @@ -163,6 +168,7 @@ def __init__( # pylint:disable=too-many-locals,too-many-statements
credential=kwargs["sasl_credential"],
port=self._port,
custom_endpoint=custom_endpoint,
custom_port=custom_port,
socket_timeout=self._socket_timeout,
network_trace_params=self._network_trace_params,
**kwargs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,13 @@ class SSLTransport(_AbstractTransport):

def __init__(self, host, *, port=AMQPS_PORT, socket_timeout=None, ssl_opts=None, **kwargs):
self.sslopts = ssl_opts if isinstance(ssl_opts, dict) else {}
self.sslopts["server_hostname"] = host
self._custom_endpoint = kwargs.get("custom_endpoint")
self._custom_port = kwargs.get("custom_port")
self.sslopts["server_hostname"] = self._custom_endpoint or host
self._read_buffer = BytesIO()
super(SSLTransport, self).__init__(host, port=port, socket_timeout=socket_timeout, **kwargs)
super(SSLTransport, self).__init__(
self._custom_endpoint or host, port=self._custom_port or port, socket_timeout=socket_timeout, **kwargs
)

def _setup_transport(self):
"""Wrap the socket in an SSL object."""
Expand Down
Loading