Skip to content

Commit

Permalink
fixed controller
Browse files Browse the repository at this point in the history
  • Loading branch information
yindia committed Oct 16, 2024
1 parent 8c8fe77 commit 9f303b1
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 97 deletions.
95 changes: 14 additions & 81 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ make bootstrap

# Run Database
docker-compose up -d

# Install river
go install github.com/riverqueue/river/cmd/river@latest

# Run River migration (It will create the river resource in the database)
river migrate-up --database-url "postgres://admin:[email protected]:5432/tasks?sslmode=disable"
```


Expand Down Expand Up @@ -75,7 +69,7 @@ Access at https://127.0.0.1:3000
### 5. Worker (Data Plane)
Start worker instances:
```bash
./bin/task-cli serve -n 10
./bin/task-cli serve --log-level debug
```

## Project Structure
Expand Down Expand Up @@ -113,16 +107,20 @@ graph TD
A[Dashboard Client] -->|Sends Request| B(Server)
C[CLI Client] -->|Sends Request| B(Server)
%% Server and its connections
B(Server) -->|Reads/Writes| D[(PostgreSQL Database)]
B(Server) -->|Publishes Message| E(RiverQueue)
%% RabbitMQ and Worker
E(RiverQueue) -->|Sends Message| F(Worker)
F(Worker) -->|Consumes Data| G[Executes Work]
%% Control Plane
subgraph Control Plane
B(Server) -->|Reads/Writes| D[(PostgreSQL Database)]
end
%% Optional back-and-forth communication if needed
F(Worker) -->|Update Status| B[(Server)]
%% Data Plane
subgraph Data Plane
E[Agent] -->|Initiates Connection| B[Server]
B[Server] -->|Publish W| E[Agent]
E -->|Creates CRD| H[CRD]
F[Controller] -->|Watches CRD| H
F -->|Executes Task| J[Task Execution]
F -->|Sends Status Update| B
end
```

This architecture allows for:
Expand Down Expand Up @@ -267,20 +265,6 @@ graph TD
K --> L
```

Reconciliation Job (Run in every 10 minutes) as background job

```mermaid
graph TD
%% Reconciliation Job Flow
subgraph Reconciliation Job
M[Start Reconciliation Job] --> N[Get List of Stuck Jobs]
N --> O{Jobs Found?}
O -->|Yes| P[Update Status: Queued]
P --> Q[Enqueue Message to River Queue]
O -->|No| R[End Reconciliation Job]
Q --> R
end
```

## API Documentation
- [Proto Docs](https://buf.build/evalsocket/cloud)
Expand Down Expand Up @@ -568,54 +552,3 @@ kind delete cluster --name task-service
This setup allows you to test the entire Task Service stack, including the server, workers, and dependencies, in a local Kubernetes environment. It's an excellent way to validate the Helm charts and ensure everything works together as expected in a Kubernetes setting.


## Future Improvements

As we continue to evolve the Task Service, we are exploring several enhancements to improve its scalability, reliability, and management.

### Kubernetes-Native Task Execution

We are considering leveraging Kubernetes Custom Resource Definitions (CRDs) and custom controllers to manage task execution. This approach would enable us to fully utilize Kubernetes' scheduling and scaling capabilities.

#### High-Level Architecture

```mermaid
graph TD
%% Clients
A[Dashboard Client] -->|Sends Request| B(Server)
C[CLI Client] -->|Sends Request| B(Server)
%% Control Plane
subgraph Control Plane
B(Server) -->|Reads/Writes| D[(PostgreSQL Database)]
end
%% Data Plane
subgraph Data Plane
E[Agent] -->|Initiates Connection| B[Server]
E -->|Creates CRD| H[CRD]
F[Controller] -->|Watches CRD| H
F -->|Creates Pod for Task| I[Pod]
I -->|Executes Task| J[Task Execution]
F -->|Sends Status Update| B
end
```

In this architecture:

1. Our agent initiates a streaming connection with the control plane and listens for events.
2. When a new task is created, the control plane generates an event for the agent.
3. Upon receiving the event, the agent creates a Custom Resource Definition (CRD) for the task in Kubernetes.
4. A custom Worker Controller watches for these CRDs and creates pods to execute the tasks.
5. Each task runs in its own pod, allowing for improved isolation and resource management.
6. The Worker Controller monitors task execution and sends status updates back to the server.


#### Design Advantages

- **Separation of Concerns**: The customer does not need to open a port; our agent initiates the connection, and only the agent has permission to create resources inside the Data Plane.
- **Single Point of Setup**: Only the agent is required to set up the Data Plane, creating the necessary resources such as the controller, CRD, and other components.
- **Multiple Data Planes**: Customers can run multiple Data Planes with one Control Plane based on their requirements (from bare metal to any cloud). In the future, we can add functionality to route tasks to specific Data Planes as needed.
- **Security**: No sensitive information is stored in the Control Plane; we only retain metadata, ensuring enhanced security.
- **Infinite Scalability**: The architecture supports scaling as needed to accommodate varying workloads.
- **Co-location Flexibility**: Customers can run both the Data Plane and Control Plane together inside their VPC for easier management.
- **Secure Storage**: All input parameters are stored as S3 objects, with only references to these objects kept in the metadata, optimizing storage usage.
4 changes: 1 addition & 3 deletions cli/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func runStreamConnection(ctx context.Context, wg *sync.WaitGroup, logger *slog.L
var err error

client := cloudv1connect.NewTaskManagementServiceClient(http.DefaultClient, "http://localhost:8080")
k8sClient, err := k8s.NewK8sClient("")
k8sClient, err := k8s.NewK8sClient("/Users/yuvraj/.kube/config")
if err != nil {
return fmt.Errorf("failed to create k8s client: %w", err)
}
Expand All @@ -91,8 +91,6 @@ func runStreamConnection(ctx context.Context, wg *sync.WaitGroup, logger *slog.L

go processWork(ctx, stream.Msg(), logger, k8sClient)
}

return nil
}

// sendPeriodicRequests sends periodic heartbeat requests to the server.
Expand Down
4 changes: 2 additions & 2 deletions controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"

taskiov1 "task/controller/api/v1"
"task/controller/internal/controller"
controller "task/controller/internal/controller"
"task/pkg/gen/cloud/v1/cloudv1connect"
// +kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -149,7 +149,7 @@ func main() {
if err = (&controller.TaskReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
cloudClient: cloudv1connect.NewTaskManagementServiceClient(http.DefaultClient, "https://localhost:8080"),
CloudClient: cloudv1connect.NewTaskManagementServiceClient(http.DefaultClient, "https://localhost:8080"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Task")
os.Exit(1)
Expand Down
92 changes: 92 additions & 0 deletions controller/config/crd/bases/task.io_tasks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.1
name: tasks.task.io
spec:
group: task.io
names:
kind: Task
listKind: TaskList
plural: tasks
singular: task
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
description: Task is the Schema for the tasks API
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: TaskSpec defines the desired state of Task
properties:
created_at:
description: CreatedAt is the timestamp of when the task was created.
type: string
description:
description: Description is a description of the task.
type: string
id:
description: ID is the unique identifier for the task.
format: int32
type: integer
name:
description: Name is the name of the task.
type: string
payload:
description: Payload contains task parameters.
properties:
parameters:
additionalProperties:
type: string
description: Parameters are dynamic key-value pairs for task parameters.
type: object
type: object
priority:
description: Priority is the priority level of the task.
format: int32
type: integer
retries:
description: Retries is the number of retries attempted for this task.
format: int32
type: integer
status:
description: Status is the current status of the task.
format: int32
type: integer
type:
description: Type is the type of the task.
type: string
type: object
status:
description: TaskStatus defines the observed state of Task
properties:
status:
description: Status is the current status of the task.
format: int32
type: integer
type: object
type: object
served: true
storage: true
subresources:
status: {}
33 changes: 27 additions & 6 deletions controller/config/rbac/role.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
app.kubernetes.io/name: controller
app.kubernetes.io/managed-by: kustomize
name: manager-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
- apiGroups:
- task.io
resources:
- tasks
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- task.io
resources:
- tasks/finalizers
verbs:
- update
- apiGroups:
- task.io
resources:
- tasks/status
verbs:
- get
- patch
- update
4 changes: 2 additions & 2 deletions controller/internal/controller/task_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
type TaskReconciler struct {
client.Client
Scheme *runtime.Scheme
cloudClient cloudv1connect.TaskManagementServiceClient
CloudClient cloudv1connect.TaskManagementServiceClient
}

// +kubebuilder:rbac:groups=task.io,resources=tasks,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -121,7 +121,7 @@ func (r *TaskReconciler) SetupWithManager(mgr ctrl.Manager) error {

// updateTaskStatus updates the status of a task using the Task Management Service.
func (r *TaskReconciler) updateTaskStatus(ctx context.Context, taskID int64, status cloudv1.TaskStatusEnum, message string) error {
_, err := r.cloudClient.UpdateTaskStatus(ctx, connect.NewRequest(&cloudv1.UpdateTaskStatusRequest{
_, err := r.CloudClient.UpdateTaskStatus(ctx, connect.NewRequest(&cloudv1.UpdateTaskStatusRequest{
Id: int32(taskID),
Status: status,
Message: message,
Expand Down
23 changes: 20 additions & 3 deletions pkg/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import (
"os"
v1 "task/controller/api/v1"

"k8s.io/apimachinery/pkg/runtime" // Import runtime for scheme
// Import for GroupVersionKind
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
// Import for error handling
)

type K8s struct {
kubeconfigPath string
client *kubernetes.Clientset
scheme *runtime.Scheme // Add scheme to K8s struct
}

// Add the following function to create a Kubernetes client for local and in-cluster setup
Expand All @@ -33,10 +37,23 @@ func NewK8sClient(kubeconfigPath string) (*K8s, error) {
}
}

return &K8s{
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

k := &K8s{
kubeconfigPath: kubeconfigPath,
client: kubernetes.NewForConfigOrDie(config),
}, nil
client: clientset,
scheme: runtime.NewScheme(), // Initialize the scheme
}

// Register your Task type with the scheme
if err := v1.AddToScheme(k.scheme); err != nil {
return nil, err
}

return k, nil
}

// CreateTask creates a new Task resource in the Kubernetes cluster
Expand Down

0 comments on commit 9f303b1

Please sign in to comment.