Skip to content

Commit

Permalink
chore: adding incremental progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Armando Bañuelos committed Jul 10, 2024
1 parent 50cd9ef commit fdee779
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 38 deletions.
5 changes: 4 additions & 1 deletion examples/metadrive/basic.scenic
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
model scenic.simulators.metadrive.model
param map = localPath('../../assets/maps/CARLA/Town01.xodr')
model scenic.simulators.metadrive.model

# car = new Car
Empty file.
62 changes: 61 additions & 1 deletion src/scenic/simulators/metadrive/model.scenic
Original file line number Diff line number Diff line change
@@ -1,2 +1,62 @@
from scenic.simulators.metadrive.simulator import MetaDriveSimulator
simulator MetaDriveSimulator()
from scenic.domains.driving.model import *
import pathlib

# TODO: research on open drive conversion for metadrive
map_town = pathlib.Path(globalParameters.map).stem
param map = map_town
param timestep = 0.1
param render = 0

simulator MetaDriveSimulator(
timestep=float(globalParameters.timestep),
render=bool(globalParameters.render),
metadrive_map=globalParameters.map,
)

class MetaDriveActor(DrivingObject):
"""Abstract class for MetaDrive objects.
Properties:
metaDriveActor (dynamic): Set during simulations to the ``metaDrive.Actor`` representing this
object.
blueprint (str): Identifier of the MetaDrive blueprint specifying the type of object.
rolename (str): Can be used to differentiate specific actors during runtime. Default
value ``None``.
"""
metaDriveActor: None
rolename: None

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def setPosition(self, pos):
self.metaDriveActor.set_location(pos)

def setVelocity(self, vel):
self.metaDriveActor.set_velocity(vel, 1, in_local_frame=True)


class Vehicle(Vehicle, MetaDriveActor):
"""Abstract class for steerable vehicles."""

def setThrottle(self, throttle):
self.metaDriveActor._apply_throttle_brake(0)

def setSteering(self, steering):
self.control.steer = steering

def setBraking(self, braking):
self.control.brake = braking

def setHandbrake(self, handbrake):
self.control.hand_brake = handbrake

def setReverse(self, reverse):
self.control.reverse = reverse

class Car:
blueprint: None
@property
def isCar(self):
return True
152 changes: 116 additions & 36 deletions src/scenic/simulators/metadrive/simulator.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,106 @@
from scenic.core.simulators import (
Simulation,
SimulationCreationError,
Simulator,
SimulatorInterfaceWarning,
)
try:
from metadrive.component.vehicle.vehicle_type import DefaultVehicle
from metadrive.component.map.base_map import BaseMap
from metadrive.component.map.pg_map import MapGenerateMethod
from metadrive.examples.ppo_expert.numpy_expert import expert
from metadrive.utils.opendrive.map_load import load_opendrive_map
from metadrive.component.road_network.edge_road_network import OpenDriveRoadNetwork
from metadrive.component.opendrive_block.opendrive_block import OpenDriveBlock
from metadrive.policy.idm_policy import IDMPolicy
from metadrive.utils.draw_top_down_map import draw_top_down_map
except ImportError as e:
raise ModuleNotFoundError('Metadrive scenarios require the "metadrive" Python package') from e

from metadrive import TopDownMetaDrive
from metadrive.examples.ppo_expert.numpy_expert import expert
import random
from .utils import DriveEnv
import matplotlib.pyplot as plt
from scenic.core.simulators import SimulationCreationError
from scenic.domains.driving.simulators import DrivingSimulation, DrivingSimulator
from scenic.domains.driving.roads import Network
from metadrive.engine.asset_loader import AssetLoader
from metadrive.engine.asset_loader import initialize_asset_loader

class MetaDriveSimulator(Simulator):
def __init__(self):
self.client = TopDownMetaDrive(
dict(
# We also support using two renderer (Panda3D renderer and Pygame renderer) simultaneously. You can
# try this by uncommenting next line.
# use_render=True,

# You can also try to uncomment next line with "use_render=True", so that you can control the ego vehicle
# with keyboard in the main window.
# manual_control=True,
num_agents=1,
map="SSSS",
traffic_density=0,
num_scenarios=1,
# start_seed=random.randint(0, 1000),
class MetaDriveSimulator(DrivingSimulator):
def __init__(
self,
timestep=0.1,
render=False,
metadrive_map={},
):
super().__init__()
# check to see if metadrive map is active
self.render = render

self.client = DriveEnv(
dict(
agent_policy=IDMPolicy,
use_render=self.render,
# map_config=self.metadrive_map,
traffic_density=0,
)
)
o, _ = self.client.reset()
super().__init__()

if metadrive_map is not None:
map_data = load_opendrive_map(metadrive_map)
global_network = OpenDriveRoadNetwork()
blocks = []
for road in map_data.roads:
for section in road.lanes.lane_sections:
block = OpenDriveBlock(len(blocks), global_network, 0, section)
block.construct_block(self.client.engine.render, self.client.engine.physics_world)
blocks.append(block)
# import pdb; pdb.set_trace()
# lanes = [lane_info.lane for lane_info in global_network.graph.values()]
# for lane in lanes:
# import pdb; pdb.set_trace()
else:
self.metadrive_map={
BaseMap.GENERATE_TYPE: MapGenerateMethod.BIG_BLOCK_SEQUENCE,
BaseMap.GENERATE_CONFIG: "rCR",
BaseMap.LANE_WIDTH: 3.5,
BaseMap.LANE_NUM: 2
}
self.scenario_number = 0
self.timestep = timestep

# self.cleanup() # Ensure cleanup before reset
# import pdb;pdb.set_trace()
# self.client.reset()
# current_map = draw_top_down_map(self.client.current_map)
# fig, ax = plt.subplots(figsize=(10, 10)) # Adjust figsize as needed
# draw_top_down_map(current_map, ax)
# ax.set_aspect('equal') # Ensure aspect ratio is maintained
# plt.show()
self.client.agent_manager.reset()
# self.client.reset()

def createSimulation(self, scene, **kwargs):
return MetaDriveSimulation(self, scene, self.client, **kwargs)
def createSimulation(self, scene, *, timestep, **kwargs):
if timestep is not None and timestep != self.timestep:
raise RuntimeError(
"cannot customize timestep for individual MetaDrive simulations; "
"set timestep when creating the MetaDriveSimulator instead"
)
self.scenario_number += 1
return MetaDriveSimulation(
scene,
self.client,
self.render,
self.scenario_number,
timestep=self.timestep,
**kwargs,
)

def destroy(self):
self.client.close()
super().destroy()

class MetaDriveSimulation(Simulation):
def __init__(self, simulator, scene, client, **kwargs):
self.simulator = simulator

class MetaDriveSimulation(DrivingSimulation):
def __init__(self, scene, client, render, scenario_number, **kwargs):
self.client = client
self.render = render
self.scenario_number = scenario_number
self.client = client
super().__init__(scene, **kwargs)

Expand All @@ -50,18 +111,37 @@ def step(self):
# import pdb; pdb.set_trace()
# for i in range(1, 100000):
# o, r, tm, tc, info = self.client.step(expert(self.client.agent))
o, r, tm, tc, info = self.client.step(expert(self.client.agent))
self.client.render(mode="top_down", screen_size=(500, 500),
screen_record=True,
window=True)
try:
o, r, tm, tc, info = self.client.step([0,3])
self.client.render(mode="top_down", screen_size=(500, 500),
screen_record=True,
window=True)
except Exception as e:
print(f"Error during step: {e}")
# self.client.render(mode="top_down", text={"Quit": "ESC"}, film_size=(2000, 2000))


def createObjectInSimulator(self, obj):
return #super().createObjectInSimulator(obj)
import pdb; pdb.set_trace()
if isinstance(obj.metaDriveActor, DefaultVehicle):
metadriveActor = self.client.engine.spawn_object(DefaultVehicle,
vehicle_config=dict(),
position=(0, 0),
heading=0)
# metadriveActor.setThrottle(0)


return metadriveActor
# if obj.rolename is not None:
# blueprint.set_attribute("role_name", obj.rolename)
# return

def destroy(self):
self.client.reset()
try:
self.client.reset()
except AssertionError as ae:
print(f"Assertion error during reset: {ae}")

super().destroy()

def getProperties(self, obj, properties):
Expand Down
59 changes: 59 additions & 0 deletions src/scenic/simulators/metadrive/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from metadrive.envs import MetaDriveEnv
from metadrive.manager import BaseManager
from metadrive.component.vehicle.vehicle_type import DefaultVehicle
from metadrive.policy.idm_policy import IDMPolicy
from metadrive.component.traffic_participants.pedestrian import Pedestrian

class DriveManager(BaseManager):
def __init__(self):
super(DriveManager, self).__init__()
self.generated_v = None
self.ped = None
self.generate_ts = 0

def cleanup_physics_bodies(self):
for obj in self.get_objects():
print(obj)
# import pdb; pdb.set_trace()
# obj.cleanup_physics_body() # Implement this method as needed

def reset(self):
self.cleanup_physics_bodies()
super().reset() # Call super method after cleanup

def before_step(self):
# if self.generated_v:
# p = self.get_policy(self.generated_v.id)
# self.generated_v.before_step(p.act())
self.cleanup_physics_bodies()
pass


def after_step(self):
# if self.episode_step == self.generate_ts:
# self.generated_v = self.spawn_object(DefaultVehicle,
# vehicle_config=dict(),
# position=(10, 0),
# heading=0)
# self.add_policy(self.generated_v.id, IDMPolicy, self.generated_v, self.generate_seed())
# self.ped = self.spawn_object(
# Pedestrian,
# name="ped",
# position=(50,0),
# heading_theta=1,
# )
# self.ped.set_velocity([1, 0], 1, in_local_frame=True)
pass



# elif self.episode_step == self.recycle_ts:
# self.clear_objects([self.generated_v.id])
# self.generated_v = None
# elif self.generated_v:
# self.generated_v.after_step()

class DriveEnv(MetaDriveEnv):
def setup_engine(self):
super(DriveEnv, self).setup_engine()
self.engine.register_manager("drive_mgr", DriveManager())

0 comments on commit fdee779

Please sign in to comment.