Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Ingestion Spec Submission #189

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions chart/templates/rbac_manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
{{- end }}
{{- end }}

Expand Down
47 changes: 42 additions & 5 deletions controllers/ingestion/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ func (r *DruidIngestionReconciler) CreateOrUpdate(
// check if task id does not exist in status
if di.Status.TaskId == "" && di.Status.CurrentIngestionSpec == "" {
// if does not exist create task

postHttp := internalhttp.NewHTTPClient(
&http.Client{},
&auth,
)

respCreateTask, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec))
respCreateTask, err := postHttp.Do(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false),
[]byte(di.Spec.Ingestion.Spec),
)

if err != nil {
return controllerutil.OperationResultNone, err
Expand Down Expand Up @@ -203,7 +206,11 @@ func (r *DruidIngestionReconciler) CreateOrUpdate(
&auth,
)

respUpdateSpec, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec))
respUpdateSpec, err := postHttp.Do(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false),
[]byte(di.Spec.Ingestion.Spec),
)
if err != nil {
return controllerutil.OperationResultNone, err
}
Expand Down Expand Up @@ -290,6 +297,13 @@ func getPath(
}
case v1alpha1.HadoopIndexHadoop:
case v1alpha1.Kafka:
if httpMethod == http.MethodGet {
return makeSupervisorGetTaskPath(svcName, taskId)
} else if httpMethod == http.MethodPost && !shutDownTask {
return makeSupervisorCreateUpdateTaskPath(svcName)
} else if shutDownTask {
return makeSupervisorShutDownTaskPath(svcName, taskId)
}
case v1alpha1.Kinesis:
case v1alpha1.QueryControllerSQL:
default:
Expand All @@ -312,16 +326,39 @@ func makeRouterGetTaskPath(svcName, taskId string) string {
return svcName + "/druid/indexer/v1/task/" + taskId
}

func makeSupervisorCreateUpdateTaskPath(svcName string) string {
return svcName + "/druid/indexer/v1/supervisor"
}

func makeSupervisorShutDownTaskPath(svcName, taskId string) string {
return svcName + "/druid/indexer/v1/supervisor/" + taskId + "/shutdown"
}

func makeSupervisorGetTaskPath(svcName, taskId string) string {
return svcName + "/druid/indexer/v1/supervisor/" + taskId
}

type taskHolder struct {
Task string `json:"task"`
Task string `json:"task"` // tasks
ID string `json:"id"` // supervisor
}

func getTaskIdFromResponse(resp string) (string, error) {
var task taskHolder
if err := json.Unmarshal([]byte(resp), &task); err != nil {
return "", err
}
return task.Task, nil

// check both fields and return the appropriate value
// tasks use different field names than supervisors
if task.Task != "" {
return task.Task, nil
}
if task.ID != "" {
return task.ID, nil
}

return "", errors.New("task id not found")
}

func (r *DruidIngestionReconciler) getRouterSvcUrl(namespace, druidClusterName string) (string, error) {
Expand Down
74 changes: 74 additions & 0 deletions e2e/configs/kafka-ingestion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
apiVersion: druid.apache.org/v1alpha1
kind: DruidIngestion
metadata:
labels:
app.kubernetes.io/name: druidingestion
app.kubernetes.io/instance: druidingestion-sample
name: kafka-1
spec:
suspend: false
druidCluster: tiny-cluster
ingestion:
type: kafka
spec: |-
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
}
}
}
3 changes: 2 additions & 1 deletion e2e/configs/minio-tenant-override.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
tenant:
pools:
- servers: 1
- name: "minio"
servers: 1
volumesPerServer: 1
certificate:
requestAutoCert: false
Expand Down
17 changes: 14 additions & 3 deletions e2e/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -x
# Get Kind
go install sigs.k8s.io/[email protected]
# minio statefulset name
MINIO_STS_NAME=myminio-ss-0
MINIO_STS_NAME=myminio-minio
# druid namespace
NAMESPACE=druid
# fmt code
Expand All @@ -23,8 +23,6 @@ make docker-build-local-test
make docker-push-local-test
# try to install the CRD with make
make install
# delete the crd
make uninstall
# install druid-operator
make helm-install-druid-operator
# install minio-operator and tenant
Expand Down Expand Up @@ -64,6 +62,19 @@ sleep 30 # wait for the manager to submit the ingestion task
taskId=`kubectl get druidingestion -n druid wikipedia-ingestion --template={{.status.taskId}}`
make deploy-testingestionjob TASK_ID=$taskId

# Running a test Kafka DruidIngestion resource and wait for the task to be submitted
kubectl apply -f e2e/configs/kafka-ingestion.yaml -n ${NAMESPACE}
sleep 30 # wait for the manager to submit the ingestion task

# Verify the supervisor task has been created
supervisorTaskId=`kubectl get druidingestion -n druid kafka-1 --template={{.status.taskId}}`
if [ -z "$supervisorTaskId" ]; then
echo "Failed to get supervisor task ID"
exit 1
else
echo "Supervisor task ID: $supervisorTaskId"
fi

# Delete old druid
kubectl delete -f e2e/configs/druid-cr.yaml -n ${NAMESPACE}
for d in $(kubectl get pods -n ${NAMESPACE} -l app=druid -l druid_cr=tiny-cluster -o name)
Expand Down
74 changes: 74 additions & 0 deletions examples/kafka-ingestion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
apiVersion: druid.apache.org/v1alpha1
kind: DruidIngestion
metadata:
labels:
app.kubernetes.io/name: druidingestion
app.kubernetes.io/instance: druidingestion-sample
name: kafka-1
spec:
suspend: false
druidCluster: tiny-cluster
ingestion:
type: kafka
spec: |-
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
}
}
}
Loading