Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Azure): multiprocessing #53

Draft
wants to merge 13 commits into
base: worker-pool-v4.0
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP:
- add provider to log (aws only for now)
- change add_seed to use a list and submit_seed_payload
- change add_cloud_asset to use a map + submit_cloud_asset_payload
- rough and ready aurora client
  • Loading branch information
Eric Butera committed Oct 20, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit b482c3e5d58a76ce93b8e0738ce8d9c94676b839
183 changes: 156 additions & 27 deletions src/censys/cloud_connectors/aws_connector/connector.py
Original file line number Diff line number Diff line change
@@ -147,7 +147,7 @@ def scan_seeds(self, **kwargs):
logger = get_logger(
log_name=f"{self.provider.lower()}_cloud_connector",
level=self.settings.logging_level,
# TODO: extra - add account + region to log lines
provider=f"{self.provider}_{scan_context.account_number}_{scan_context.region}",
)
scan_context.logger = logger

@@ -300,6 +300,7 @@ def format_label(
"""
region_label = f"/{region}" if region != "" else ""
return f"AWS: {service} - {account_number}{region_label}"
# technically this should use self.provider.label() instead of hardcoding "AWS"

def credentials(self, ctx: AwsScanContext) -> dict:
"""Generate required credentials for AWS.
@@ -518,13 +519,21 @@ def get_api_gateway_domains_v1(self, **kwargs):
label = self.format_label(SeedLabel.API_GATEWAY, ctx.account_number, ctx.region)

try:
seeds = []
apis = client.get_rest_apis()
for domain in apis.get("items", []):
domain_name = f"{domain['id']}.execute-api.{ctx.region}.amazonaws.com"
with SuppressValidationError():
domain_seed = DomainSeed(value=domain_name, label=label)
# domain_seed = DomainSeed(value=domain_name, label=label)
# self.add_seed(domain_seed, api_gateway_res=domain)
self.emit_seed(ctx, domain_seed, api_gateway_res=domain)
# self.emit_seed(ctx, domain_seed, api_gateway_res=domain)
seed = self.process_seed(
DomainSeed(
value=domain_name, label=label, api_gateway_res=domain
)
)
seeds.append(seed)
self.submit_seed_payload(label, seeds)
except ClientError as e:
self.logger.error(f"Could not connect to API Gateway V1. Error: {e}")

@@ -539,13 +548,21 @@ def get_api_gateway_domains_v2(self, **kwargs):
label = self.format_label(SeedLabel.API_GATEWAY, ctx.account_number, ctx.region)

try:
seeds = []
apis = client.get_apis()
for domain in apis.get("Items", []):
domain_name = domain["ApiEndpoint"].split("//")[1]
with SuppressValidationError():
domain_seed = DomainSeed(value=domain_name, label=label)
# domain_seed = DomainSeed(value=domain_name, label=label)
# self.add_seed(domain_seed, api_gateway_res=domain)
self.emit_seed(ctx, domain_seed, api_gateway_res=domain)
# self.emit_seed(ctx, domain_seed, api_gateway_res=domain)
seed = self.process_seed(
DomainSeed(
value=domain_name, label=label, api_gateway_res=domain
)
)
seeds.append(seed)
self.submit_seed_payload(label, seeds)
except ClientError as e:
self.logger.error(f"Could not connect to API Gateway V2. Error: {e}")

@@ -568,13 +585,21 @@ def get_load_balancers_v1(self, **kwargs):
label = self.format_label(SeedLabel.LOAD_BALANCER, ctx.account_number, ctx.region)

try:
seeds = []
data = client.describe_load_balancers()
for elb in data.get("LoadBalancerDescriptions", []):
if value := elb.get("DNSName"):
with SuppressValidationError():
domain_seed = DomainSeed(value=value, label=label)
# domain_seed = DomainSeed(value=value, label=label)
# self.add_seed(domain_seed, elb_res=elb, aws_client=client)
self.emit_seed(ctx, domain_seed, elb_res=elb, aws_client=client)
# self.emit_seed(ctx, domain_seed, elb_res=elb, aws_client=client)
seed = self.process_seed(
DomainSeed(value=value, label=label),
elb_res=elb,
aws_client=client,
)
seeds.append(seed)
self.submit_seed_payload(label, seeds)
except ClientError as e:
self.logger.error(f"Could not connect to ELB V1. Error: {e}")

@@ -589,13 +614,21 @@ def get_load_balancers_v2(self, **kwargs):
label = self.format_label(SeedLabel.LOAD_BALANCER, ctx.account_number, ctx.region)

try:
seeds = []
data = client.describe_load_balancers()
for elb in data.get("LoadBalancers", []):
if value := elb.get("DNSName"):
with SuppressValidationError():
domain_seed = DomainSeed(value=value, label=label)
# domain_seed = DomainSeed(value=value, label=label)
# self.add_seed(domain_seed, elb_res=elb, aws_client=client)
self.emit_seed(ctx, domain_seed, elb_res=elb, aws_client=client)
# self.emit_seed(ctx, domain_seed, elb_res=elb, aws_client=client)
seed = self.process_seed(
DomainSeed(value=value, label=label),
elb_res=elb,
aws_client=client,
)
seeds.append(seed)
self.submit_seed_payload(label, seeds)
except ClientError as e:
self.logger.error(f"Could not connect to ELB V2. Error: {e}")

@@ -616,6 +649,7 @@ def get_network_interfaces(self, **kwargs):
interfaces = self.describe_network_interfaces(ctx)
instance_tags, instance_tag_sets = self.get_resource_tags(ctx)

seeds = []
for ip_address, record in interfaces.items():
instance_id = record["InstanceId"]
tags = instance_tags.get(instance_id)
@@ -626,9 +660,16 @@ def get_network_interfaces(self, **kwargs):
continue

with SuppressValidationError():
ip_seed = IpSeed(value=ip_address, label=label)
# ip_seed = IpSeed(value=ip_address, label=label)
# self.add_seed(ip_seed, tags=instance_tag_sets.get(instance_id))
self.emit_seed(ctx, ip_seed, tags=instance_tag_sets.get(instance_id))
# self.emit_seed(ctx, ip_seed, tags=instance_tag_sets.get(instance_id))
seed = self.process_seed(
IpSeed(value=ip_address, label=label),
tags=instance_tag_sets.get(instance_id),
)
seeds.append(seed)

self.submit_seed_payload(label, seeds)

def describe_network_interfaces(self, ctx: AwsScanContext) -> dict:
"""Retrieve EC2 Elastic Network Interfaces (ENI) data.
@@ -755,17 +796,24 @@ def get_rds_instances(self, **kwargs):
has_added_seeds = False

try:
seeds = []
data = client.describe_db_instances()
for instance in data.get("DBInstances", []):
if not instance.get("PubliclyAccessible"):
continue

if domain_name := instance.get("Endpoint", {}).get("Address"):
with SuppressValidationError():
domain_seed = DomainSeed(value=domain_name, label=label)
# domain_seed = DomainSeed(value=domain_name, label=label)
# self.add_seed(domain_seed, rds_res=instance)
# self.emit_seed(ctx, domain_seed, rds_res=instance)
has_added_seeds = True
self.emit_seed(ctx, domain_seed, rds_res=instance)
seed = self.process_seed(
DomainSeed(value=domain_name, label=label), rds_res=instance
)
seeds.append(seed)

self.submit_seed_payload(label, seeds)
if not has_added_seeds:
self.delete_seeds_by_label(label)
except ClientError as e:
@@ -814,7 +862,15 @@ def get_route53_zones(self, **kwargs):
)

has_added_seeds = False
# TODO: potentially send seeds with empty values to remove "stale" seeds

# Notice add_seed has extra keyword arguments - these were piped into add_seed for dispatch_event
# - add_seed cannot use self.seeds anymore because concurrency
# - add_seed dispatched_event PER seed, but seeds were later submitted in submit_seeds
# - for now i split dispatch_event, add_seed, and submit_seed_payloads into separate calls

try:
seeds = []
zones = self._get_route53_zone_hosts(client)
for zone in zones.get("HostedZones", []):
if zone.get("Config", {}).get("PrivateZone"):
@@ -823,10 +879,22 @@ def get_route53_zones(self, **kwargs):
# Add the zone itself as a seed
domain_name = zone.get("Name").rstrip(".")
with SuppressValidationError():
domain_seed = DomainSeed(value=domain_name, label=label)
# domain_seed = DomainSeed(value=domain_name, label=label)
has_added_seeds = True
# self.add_seed(domain_seed, route53_zone_res=zone, aws_client=client)
self.emit_seed(ctx, domain_seed, route53_zone_res=zone)
seed = self.process_seed(
DomainSeed(value=domain_name, label=label),
route53_zone_res=zone,
aws_client=client,
)
seeds.append(seed)
# self.emit_seed(ctx, domain_seed, route53_zone_res=zone)
# self.dispatch_event(
# EventTypeEnum.SEED_FOUND,
# seed=domain_seed,
# route53_zone_res=zone,
# aws_client=client,
# )
# seeds.append(domain_seed)

id = zone.get("Id")
resource_sets = self._get_route53_zone_resources(client, id)
@@ -837,11 +905,33 @@ def get_route53_zones(self, **kwargs):

domain_name = resource_set.get("Name").rstrip(".")
with SuppressValidationError():
domain_seed = DomainSeed(value=domain_name, label=label)
self.add_seed(
domain_seed, route53_zone_res=zone, aws_client=client
# domain_seed = DomainSeed(value=domain_name, label=label)
#
# TODO: label is for this entire loop, emitting per item will make more requests than necessary!
# seeds[seed.label].push(seed)
# self.add_seed(domain_seed, route53_zone_res=zone, aws_client=client)
# TODO: add_seed quadratic time - loops here, then loops to submit seed
# self.emit_seed(
# ctx, domain_seed, route53_zone_res=zone, aws_client=client
# )
#
# self.dispatch_event(
# EventTypeEnum.SEED_FOUND,
# seed=domain_seed,
# route53_zone_res=zone,
# aws_client=client,
# )
# seeds.append(domain_seed)

seed = self.process_seed(
DomainSeed(value=domain_name, label=label),
route53_zone_res=zone,
aws_client=client,
)
seeds.append(seed)
has_added_seeds = True

self.submit_seed_payload(label, seeds)
if not has_added_seeds:
self.delete_seeds_by_label(label)
except ClientError as e:
@@ -858,6 +948,7 @@ def get_ecs_instances(self, **kwargs):

has_added_seeds = False
try:
seeds = []
clusters = ecs.list_clusters()
for cluster in clusters.get("clusterArns", []):
cluster_instances = ecs.list_container_instances(cluster=cluster)
@@ -884,10 +975,22 @@ def get_ecs_instances(self, **kwargs):
continue

with SuppressValidationError():
ip_seed = IpSeed(value=ip_address, label=label)
# ip_seed = IpSeed(value=ip_address, label=label)
# TODO: don't use add_seed
# instead, emit Payload
# modifying add seed would require managing account+region or use AwsScanContext which requires more time than available
# self.add_seed(ip_seed, ecs_res=instance)
# self.emit_seed(ctx, ip_seed, ecs_res=instance)
# or maybe self.enqueue(seed)
# would be best to async queue these
# but we are in a pool already...
seed = self.process_seed(
IpSeed(value=ip_address, label=label), ecs_res=instance
)
seeds.append(seed)
has_added_seeds = True
self.emit_seed(ctx, ip_seed, ecs_res=instance)

self.submit_seed_payload(label, seeds)
if not has_added_seeds:
self.delete_seeds_by_label(label)
except ClientError as e:
@@ -908,7 +1011,6 @@ def get_s3_region(self, client: S3Client, bucket: str) -> str:

def get_s3_instances(self, **kwargs):
"""Retrieve Simple Storage Service data and emit seeds."""
# TODO: how to pass in cred,region?
key = kwargs["scan_context_key"]
ctx: AwsScanContext = self.scan_contexts[key]

@@ -917,30 +1019,57 @@ def get_s3_instances(self, **kwargs):
try:
data = client.list_buckets().get("Buckets", [])

# TODO: this should actually be a set of buckets, not a list (no dupes)
# findings = { 'uid1=AWS: 123456789012/us-east-1': [asset,...], 'uid2=AWS: 123456789012/us-west-1': [asset,...]}
findings: dict[str, list[AwsStorageBucketAsset]] = {}

for bucket in data:
bucket_name = bucket.get("Name")
if not bucket_name:
continue

# TODO: figure out correct value for region
# if we use lookup_region, then the submit_cloud_asset_payload call will need to be adjusted
# it shouldn't be submitting a payload PER bucket; it should be payload per account + region
lookup_region = self.get_s3_region(client, bucket_name)
label = self.format_label(
SeedLabel.STORAGE_BUCKET,
ctx.account_number,
ctx.region,
# oh this is interesting.... lookup_region OR ctx.region.. which one?
# pretty sure it's lookup_region, otherwise whats the point of looking up the bucket's region?
lookup_region,
# ctx.region,
)

# TODO: this isnt right
# assets = []

with SuppressValidationError():
bucket_asset = AwsStorageBucketAsset(
asset = AwsStorageBucketAsset(
value=AwsStorageBucketAsset.url(bucket_name, lookup_region),
uid=label,
scan_data={
"accountNumber": ctx.account_number,
},
)
# self.add_cloud_asset(bucket_asset, bucket_name=bucket_name, aws_client=client)
self.emit_cloud_asset(
ctx, bucket_asset, bucket_name=bucket_name, aws_client=client
# self.add_cloud_asset(asset, bucket_name=bucket_name, aws_client=client)
# self.emit_cloud_asset(
# ctx, asset, bucket_name=bucket_name, aws_client=client
# )
asset = self.process_cloud_asset(
asset, bucket_name=bucket_name, aws_client=client
)
# assets.append(asset)
if label not in findings:
findings[label] = []
findings[label].append(asset)

# TODO convert this to findings below
# self.submit_cloud_asset_payload(label, assets)

# TODO: submit findings map here
for label, assets in findings.items():
self.submit_cloud_asset_payload(label, assets)
except ClientError as e:
self.logger.error(f"Could not connect to S3. Error: {e}")

Loading