Skip to content

Commit

Permalink
Bug fix
Browse files Browse the repository at this point in the history
update func Reconcile, Reconciler struct, pipelinerun_expired, taskrun_expired
and controller.
  • Loading branch information
anxinyf committed Dec 15, 2019
1 parent cd77e3d commit cfe7a94
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 164 deletions.
3 changes: 0 additions & 3 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ type PipelineRunStatus struct {
// map of PipelineRunTaskRunStatus with the taskRun name as the key
// +optional
TaskRuns map[string]*PipelineRunTaskRunStatus `json:"taskRuns,omitempty"`

// ExpirationTime is the time the PipelineRun expired.
ExpirationTime *metav1.Time `json:"expirationTime, omitempty"`

This comment has been minimized.

Copy link
@anxinyf

anxinyf Dec 17, 2019

Author

The ExpirationTime will not exist with this PR, maybe next :)

}

// PipelineRunTaskRunStatus contains the name of the PipelineTask for this TaskRun and the TaskRun's Status
Expand Down
5 changes: 0 additions & 5 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ type TaskRunStatus struct {
// the digest of build container images
// optional
ResourcesResult []PipelineResourceResult `json:"resourcesResult,omitempty"`

// ExpirationTime is the time the build expired.
// If TaskRun is built from PipelineRun, the time is the PipelineRun's ExpirationTime.
// If TaskRun doesn't have PipelineRun OwnerReference, then the time is TaskRun's ExpirationTime
ExpirationTime *metav1.Time `json:"expirationTime, omitempty"`

This comment has been minimized.

Copy link
@anxinyf

anxinyf Dec 17, 2019

Author

The ExpirationTime will not exist with this PR, maybe next :)

}

// GetCondition returns the Condition matching the given type.
Expand Down
8 changes: 0 additions & 8 deletions pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 6 additions & 23 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
clustertaskinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/clustertask"
conditioninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/condition"
Expand All @@ -34,7 +33,6 @@ import (
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/config"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -83,7 +81,6 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
metrics: metrics,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_pipelineruns_to_delete"),
}
impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName)

Expand All @@ -97,27 +94,10 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
DeleteFunc: impl.Enqueue,
})

AddPipelineRun := func(obj interface{}) {
pr := obj.(*v1alpha1.PipelineRun)
c.Logger.Infof("Adding PipelineRun %s/%s", pr.Namespace, pr.Name)

if pr.DeletionTimestamp == nil && pipelineRunCleanup(pr) {
impl.Enqueue(pr)
}
}

UpdatePipelineRun := func(old, cur interface{}) {
pr := cur.(*v1alpha1.PipelineRun)
c.Logger.Infof("Updating PipelineRun %s/%s", pr.Namespace, pr.Name)

if pr.DeletionTimestamp == nil && pipelineRunCleanup(pr) {
impl.Enqueue(pr)
}
}

pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: AddPipelineRun,
UpdateFunc: UpdatePipelineRun,
AddFunc: c.AddPipelineRun,
UpdateFunc: c.UpdatePipelineRun,
DeleteFunc: impl.Enqueue,

This comment has been minimized.

Copy link
@anxinyf

anxinyf Dec 17, 2019

Author

in my opinion, this pipelineRunInformer can repalce the previous one

})

c.pipelineRunLister = pipelineRunInformer.Lister()
Expand All @@ -128,6 +108,9 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
UpdateFunc: controller.PassNew(impl.EnqueueControllerOf),
})

c.taskRunLister = taskRunInformer.Lister()
c.ListerSynced = taskRunInformer.Informer().HasSynced

c.clock = clock.RealClock{}
c.Logger.Info("Setting up ConfigMap receivers")
c.configStore = config.NewStore(images, c.Logger.Named("config-store"))
Expand Down
19 changes: 10 additions & 9 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/apis"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -108,9 +107,6 @@ type Reconciler struct {
// The clock for tracking time
clock clock.Clock

// PipelineRuns that the controller will check its TTL and attempt to delete when the TTL expires.
queue workqueue.RateLimitingInterface

// ListerSynced returns true if the TaskRun store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
ListerSynced cache.InformerSynced
Expand Down Expand Up @@ -166,6 +162,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
var merr error

if pr.IsDone() {

if err := artifacts.CleanupArtifactStorage(pr, c.KubeClientSet, c.Logger); err != nil {
c.Logger.Errorf("Failed to delete PVC for PipelineRun %s: %v", pr.Name, err)
return err
Expand All @@ -181,6 +178,10 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
if err := c.processPipelineRunExpired(pr.Namespace, pr.Name, pr); err != nil {
c.Logger.Errorf("Failed to cleanup the expired PipelineRun %s/%s: %v", pr.Namespace, pr.Name, err)
return err
}
} else {
if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil {
c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err)
Expand Down Expand Up @@ -226,7 +227,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}(c.metrics)
}

return multierror.Append(merr, c.processPipelineRunExpired(namespace, name, pr))
return merr
}

func (c *Reconciler) getPipelineFunc(tr *v1alpha1.PipelineRun) resources.GetPipeline {
Expand Down Expand Up @@ -578,7 +579,7 @@ func addRetryHistory(tr *v1alpha1.TaskRun) {
func clearStatus(tr *v1alpha1.TaskRun) {
tr.Status.StartTime = nil
tr.Status.CompletionTime = nil
tr.Status.ExpirationTime = nil
//tr.Status.ExpirationTime = nil
tr.Status.PodName = ""
}

Expand Down Expand Up @@ -642,9 +643,9 @@ func (c *Reconciler) updateStatus(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineR
pr.Status.CompletionTime = &metav1.Time{Time: time.Now()}

// update pr expiration time
if pr.Spec.ExpirationSecondsTTL != nil {
pr.Status.ExpirationTime.Time = pr.Status.CompletionTime.Add(pr.Spec.ExpirationSecondsTTL.Duration * time.Second)
}
//if pr.Spec.ExpirationSecondsTTL != nil {
// pr.Status.ExpirationTime.Time = pr.Status.CompletionTime.Add(pr.Spec.ExpirationSecondsTTL.Duration * time.Second)
//}

}
if !reflect.DeepEqual(pr.Status, newPr.Status) {
Expand Down
52 changes: 27 additions & 25 deletions pkg/reconciler/pipelinerun/pipelinerun_expired.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,37 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
)

func (tc *Reconciler) PrEnqueueAfter(pr *apispipeline.PipelineRun, after time.Duration) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pr)
if err != nil {
tc.Logger.Errorf("couldn't get key for object %#v: %v", pr, err)
return
const ControllerName = "TTLExpiredController"

func (tc *Reconciler) AddPipelineRun(obj interface{}) {
pr := obj.(*apispipeline.PipelineRun)
tc.Logger.Infof("Adding PipelineRun %s/%s if the PipelineRun has succeeded or failed and has a TTL set.", pr.Namespace, pr.Name)

if pr.DeletionTimestamp == nil && pipelineRunCleanup(pr) {
controller.NewImpl(tc, tc.Logger, ControllerName).Enqueue(pr)
}
}

tc.queue.AddAfter(key, after)
func (tc *Reconciler) UpdatePipelineRun(old, cur interface{}) {
pr := cur.(*apispipeline.PipelineRun)
tc.Logger.Infof("Updating PipelineRun %s/%s if the PipelineRun has succeed or failed and has a TTL set.", pr.Namespace, pr.Name)

if pr.DeletionTimestamp == nil && pipelineRunCleanup(pr) {
controller.NewImpl(tc, tc.Logger, ControllerName).Enqueue(pr)
}
}

// processPipelineRun will check the PipelineRun's state and TTL and delete the PipelineRun when it
// finishes and its TTL after succeeded has expired. If the PipelineRun hasn't succeeded or
// its TTL hasn't expired, it will be added to the queue after the TTL is expected
// to expire.
// This function is not meant to be invoked concurrently with the same key.
func (tc *Reconciler) processPipelineRunExpired(namespace, name string, obj interface{}) error {
ob := obj.(*apispipeline.PipelineRun)
if expired, err := tc.processPrTTL(ob); err != nil {
func (tc *Reconciler) processPipelineRunExpired(namespace, name string, pr *apispipeline.PipelineRun) error {
if expired, err := tc.processPrTTL(pr); err != nil {
return err
} else if !expired {
return nil
Expand Down Expand Up @@ -82,19 +91,10 @@ func (tc *Reconciler) processPrTTL(pr *apispipeline.PipelineRun) (expired bool,
return true, nil
}

tc.PrEnqueueAfter(pr, *t)
controller.NewImpl(tc, tc.Logger, ControllerName).EnqueueAfter(pr, *t)
return false, nil
}

func IsPipelineRunSucceeded(pr *apispipeline.PipelineRun) bool {
for _, con := range pr.Status.Conditions {
if con.Type == apis.ConditionSucceeded && con.Status == v1.ConditionTrue {
return true
}
}
return false
}

func getPrFinishAndExpireTime(pr *apispipeline.PipelineRun) (*time.Time, *time.Time, error) {
if !pipelineRunCleanup(pr) {
return nil, nil, fmt.Errorf("PipelineRun %s/%s should not be cleaned up", pr.Namespace, pr.Name)
Expand All @@ -116,15 +116,17 @@ func (tc *Reconciler) prTimeLeft(pr *apispipeline.PipelineRun, since *time.Time)
if finishAt.UTC().After(since.UTC()) {
tc.Logger.Warnf("Warning: Found PipelineRun %s/%s succeeded in the future. This is likely due to time skew in the cluster. PipelineRun cleanup will be deferred.", pr.Namespace, pr.Name)
}

remaining := expireAt.UTC().Sub(since.UTC())
tc.Logger.Infof("Found PipelineRun %s/%s succeeded at %v, remaining TTL %v since %v, TTL will expire at %v", pr.Namespace, pr.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())

return &remaining, nil
}

// PipelineRunFinishTime takes an already succeeded PipelineRun and returns the time it finishes.
func pipelineRunFinishTime(pr *apispipeline.PipelineRun) (apis.VolatileTime, error) {
for _, con := range pr.Status.Conditions {
if con.Type == apis.ConditionSucceeded && con.Status == v1.ConditionTrue {
if con.Type == apis.ConditionSucceeded && con.Status != v1.ConditionUnknown {
finishAt := con.LastTransitionTime
if finishAt.Inner.IsZero() {
return apis.VolatileTime{}, fmt.Errorf("unable to find the time when the PipelineRun %s/%s succeeded", pr.Namespace, pr.Name)
Expand All @@ -133,11 +135,11 @@ func pipelineRunFinishTime(pr *apispipeline.PipelineRun) (apis.VolatileTime, err
}
}

// This should never happen if the PipelineRuns has succeeded
return apis.VolatileTime{}, fmt.Errorf("unable to find the status of the succeeded PipelineRun %s/%s", pr.Namespace, pr.Name)
// This should never happen if the PipelineRuns has succeeded or failed
return apis.VolatileTime{}, fmt.Errorf("unable to find the status of the succeeded or failed PipelineRun %s/%s", pr.Namespace, pr.Name)
}

// pipelineRunCleanup checks whether a PipelineRun or PipelineRun has succeeded and has a TTL set.
// pipelineRunCleanup checks whether a PipelineRun has succeeded or failed and has a TTL set.
func pipelineRunCleanup(pr *apispipeline.PipelineRun) bool {
return pr.Spec.ExpirationSecondsTTL != nil && IsPipelineRunSucceeded(pr)
return pr.Spec.ExpirationSecondsTTL != nil && pr.IsDone()
}
36 changes: 32 additions & 4 deletions pkg/reconciler/pipelinerun/pipelinerun_expired_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func newPipelineRun(completionTime, failedTime apis.VolatileTime, ttl *metav1.Duration) *apispipeline.PipelineRun {
pr := tb.PipelineRun("test-pipeline-run-with-annotations-hello-world-1-9l9zj", "foo",
pr := tb.PipelineRun("test-pipeline-run-with-expiration-ttl", "foo",
tb.PipelineRunLabel("tekton.dev/pipeline", "test-pipeline"),
tb.PipelineRunLabel(pipeline.GroupName+pipeline.PipelineTaskLabelKey, "hello-world-1"),
tb.PipelineRunLabel("tekton.dev/pipelineRun", "test-pipeline-run-with-annotations"),
Expand All @@ -32,7 +32,7 @@ func newPipelineRun(completionTime, failedTime apis.VolatileTime, ttl *metav1.Du
}

if !failedTime.Inner.IsZero() {
c := apis.Condition{Type: apis.ConditionSucceeded, Status: v1.ConditionTrue, LastTransitionTime: failedTime}
c := apis.Condition{Type: apis.ConditionSucceeded, Status: v1.ConditionFalse, LastTransitionTime: failedTime}
pr.Status.Conditions = append(pr.Status.Conditions, c)
}

Expand Down Expand Up @@ -66,14 +66,14 @@ func TestTimeLeft(t *testing.T) {
ttl: &metav1.Duration{Duration: 10 * time.Second},
since: &now.Inner.Time,
expectErr: true,
expectErrStr: "should not be cleaned up",
expectErrStr: "PipelineRun foo/test-pipeline-run-with-expiration-ttl should not be cleaned up",
},
{
name: "Error case: PipelineRun completed now, no TTL",
completionTime: now,
since: &now.Inner.Time,
expectErr: true,
expectErrStr: "should not be cleaned up",
expectErrStr: "PipelineRun foo/test-pipeline-run-with-expiration-ttl should not be cleaned up",
},
{
name: "PipelineRun completed now, 0s TTL",
Expand All @@ -96,6 +96,34 @@ func TestTimeLeft(t *testing.T) {
since: &now.Inner.Time,
expectedTimeLeft: durationPointer(5),
},
{
name: "Error case: PipelineRun failed now, no TTL",
failedTime: now,
since: &now.Inner.Time,
expectErr: true,
expectErrStr: "PipelineRun foo/test-pipeline-run-with-expiration-ttl should not be cleaned up",
},
{
name: "PipelineRun failed now, 0s TTL",
failedTime: now,
ttl: &metav1.Duration{Duration: 0 * time.Second},
since: &now.Inner.Time,
expectedTimeLeft: durationPointer(0),
},
{
name: "PipelineRun failed now, 10s TTL",
failedTime: now,
ttl: &metav1.Duration{Duration: 10 * time.Second},
since: &now.Inner.Time,
expectedTimeLeft: durationPointer(10),
},
{
name: "PipelineRun failed 10s ago, 15s TTL",
failedTime: apis.VolatileTime{Inner: metav1.NewTime(now.Inner.Add(-10 * time.Second))},
ttl: &metav1.Duration{Duration: 15 * time.Second},
since: &now.Inner.Time,
expectedTimeLeft: durationPointer(5),
},
}
for _, tc := range PrTestCases {
pr := newPipelineRun(tc.completionTime, tc.failedTime, tc.ttl)
Expand Down
25 changes: 2 additions & 23 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources/cloudevent"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
kubeclient "knative.dev/pkg/client/injection/kube/client"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -78,8 +77,6 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
timeoutHandler: timeoutHandler,
cloudEventClient: cloudeventclient.Get(ctx),
metrics: metrics,
//recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_taskruns_to_delete"),
}
impl := controller.NewImpl(c, c.Logger, taskRunControllerName)

Expand All @@ -92,27 +89,9 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
UpdateFunc: controller.PassNew(impl.Enqueue),
})

AddTaskRun := func(obj interface{}) {
tr := obj.(*v1alpha1.TaskRun)
c.Logger.Infof("Adding TaskRun %s/%s", tr.Namespace, tr.Name)

if tr.DeletionTimestamp == nil && taskRunCleanup(tr) {
impl.Enqueue(tr)
}
}

UpdateTaskRun := func(old, cur interface{}) {
tr := cur.(*v1alpha1.TaskRun)
c.Logger.Infof("Updating TaskRun %s/%s", tr.Namespace, tr.Name)

if tr.DeletionTimestamp == nil && taskRunCleanup(tr) {
impl.Enqueue(tr)
}
}

taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: AddTaskRun,
UpdateFunc: UpdateTaskRun,
AddFunc: c.AddTaskRun,
UpdateFunc: c.UpdateTaskRun,

This comment has been minimized.

Copy link
@anxinyf

anxinyf Dec 17, 2019

Author

like pipelineRunInformer, this taskRunInformer can repalce the previous one

})

c.taskRunLister = taskRunInformer.Lister()
Expand Down
Loading

0 comments on commit cfe7a94

Please sign in to comment.