Skip to content

Commit

Permalink
List storage jobs query API (#748)
Browse files Browse the repository at this point in the history
* first cut a a list implementation for storage jobs

Signed-off-by: Aaron Sutula <[email protected]>

* bubble up list storage jobs api through scheduler, ffs, and apis

Signed-off-by: Aaron Sutula <[email protected]>

* client for list, better name for next page token

Signed-off-by: Aaron Sutula <[email protected]>

* list cli

Signed-off-by: Aaron Sutula <[email protected]>

* fix selector filter

Signed-off-by: Aaron Sutula <[email protected]>

* remove old/redundant code

Signed-off-by: Aaron Sutula <[email protected]>

* better alias

Signed-off-by: Aaron Sutula <[email protected]>

* pr feedback

Signed-off-by: Aaron Sutula <[email protected]>

* remove counts

Signed-off-by: Aaron Sutula <[email protected]>

* migration

Signed-off-by: Aaron Sutula <[email protected]>

* good tests for sjstore.List

Signed-off-by: Aaron Sutula <[email protected]>
  • Loading branch information
asutula committed Jan 16, 2021
1 parent e88f105 commit b1ba783
Show file tree
Hide file tree
Showing 41 changed files with 2,681 additions and 3,470 deletions.
124 changes: 63 additions & 61 deletions api/client/admin/storagejobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,96 +4,98 @@ import (
"context"

adminPb "github.com/textileio/powergate/api/gen/powergate/admin/v1"
userPb "github.com/textileio/powergate/api/gen/powergate/user/v1"
)

// StorageJobs provides access to Powergate jobs admin APIs.
type StorageJobs struct {
client adminPb.AdminServiceClient
}

type storageJobsConfig struct {
UserID string
Cids []string
}
// ListSelect specifies which StorageJobs to list.
type ListSelect int32

// StorageJobsOption configures a storageJobsConfig.
type StorageJobsOption = func(*storageJobsConfig)
const (
// All lists all StorageJobs and is the default.
All ListSelect = iota
// Queued lists queued StorageJobs.
Queued
// Executing lists executing StorageJobs.
Executing
// Final lists final StorageJobs.
Final
)

// WithUserID filters the results to the specified user.
func WithUserID(userID string) StorageJobsOption {
return func(conf *storageJobsConfig) {
conf.UserID = userID
}
// ListConfig controls the behavior for listing StorageJobs.
type ListConfig struct {
// UserIDFilter filters StorageJobs list to the specified user ID. Defaults to no filter.
UserIDFilter string
// CidFilter filters StorageJobs list to the specified cid. Defaults to no filter.
CidFilter string
// Limit limits the number of StorageJobs returned. Defaults to no limit.
Limit uint64
// Ascending returns the StorageJobs ascending by time. Defaults to false, descending.
Ascending bool
// Select specifies to return StorageJobs in the specified state.
Select ListSelect
// NextPageToken sets the slug from which to start building the next page of results.
NextPageToken string
}

// WithCids filters the results to the specified data cids.
func WithCids(cids ...string) StorageJobsOption {
return func(conf *storageJobsConfig) {
conf.Cids = cids
}
type summaryConfig struct {
userID string
cid string
}

// Queued returns a list of queued storage jobs.
func (j *StorageJobs) Queued(ctx context.Context, opts ...StorageJobsOption) (*adminPb.QueuedStorageJobsResponse, error) {
conf := &storageJobsConfig{}
for _, opt := range opts {
opt(conf)
}
req := &adminPb.QueuedStorageJobsRequest{
UserId: conf.UserID,
Cids: conf.Cids,
}
return j.client.QueuedStorageJobs(ctx, req)
}
// SummaryOption configures a storageJobsConfig.
type SummaryOption = func(*summaryConfig)

// Executing returns a list of executing storage jobs.
func (j *StorageJobs) Executing(ctx context.Context, opts ...StorageJobsOption) (*adminPb.ExecutingStorageJobsResponse, error) {
conf := &storageJobsConfig{}
for _, opt := range opts {
opt(conf)
}
req := &adminPb.ExecutingStorageJobsRequest{
UserId: conf.UserID,
Cids: conf.Cids,
// WithUserID filters the results to the specified user.
func WithUserID(userID string) SummaryOption {
return func(conf *summaryConfig) {
conf.userID = userID
}
return j.client.ExecutingStorageJobs(ctx, req)
}

// LatestFinal returns a list of latest final storage jobs.
func (j *StorageJobs) LatestFinal(ctx context.Context, opts ...StorageJobsOption) (*adminPb.LatestFinalStorageJobsResponse, error) {
conf := &storageJobsConfig{}
for _, opt := range opts {
opt(conf)
}
req := &adminPb.LatestFinalStorageJobsRequest{
UserId: conf.UserID,
Cids: conf.Cids,
// WithCid filters the results to the specified data cid.
func WithCid(cid string) SummaryOption {
return func(conf *summaryConfig) {
conf.cid = cid
}
return j.client.LatestFinalStorageJobs(ctx, req)
}

// LatestSuccessful returns a list of latest successful storage jobs.
func (j *StorageJobs) LatestSuccessful(ctx context.Context, opts ...StorageJobsOption) (*adminPb.LatestSuccessfulStorageJobsResponse, error) {
conf := &storageJobsConfig{}
for _, opt := range opts {
opt(conf)
// List lists StorageJobs according to the provided ListConfig.
func (j *StorageJobs) List(ctx context.Context, config ListConfig) (*adminPb.ListStorageJobsResponse, error) {
sel := userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_UNSPECIFIED
switch config.Select {
case All:
sel = userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_ALL
case Queued:
sel = userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_QUEUED
case Executing:
sel = userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_EXECUTING
case Final:
sel = userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_FINAL
}
req := &adminPb.LatestSuccessfulStorageJobsRequest{
UserId: conf.UserID,
Cids: conf.Cids,
req := &adminPb.ListStorageJobsRequest{
CidFilter: config.CidFilter,
Limit: config.Limit,
Ascending: config.Ascending,
NextPageToken: config.NextPageToken,
Selector: sel,
}
return j.client.LatestSuccessfulStorageJobs(ctx, req)
return j.client.ListStorageJobs(ctx, req)
}

// Summary returns a summary of storage jobs.
func (j *StorageJobs) Summary(ctx context.Context, opts ...StorageJobsOption) (*adminPb.StorageJobsSummaryResponse, error) {
conf := &storageJobsConfig{}
func (j *StorageJobs) Summary(ctx context.Context, opts ...SummaryOption) (*adminPb.StorageJobsSummaryResponse, error) {
conf := &summaryConfig{}
for _, opt := range opts {
opt(conf)
}
req := &adminPb.StorageJobsSummaryRequest{
UserId: conf.UserID,
Cids: conf.Cids,
UserId: conf.userID,
Cid: conf.cid,
}
return j.client.StorageJobsSummary(ctx, req)
}
97 changes: 67 additions & 30 deletions api/client/storagejobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,48 @@ type StorageJobs struct {
client userPb.UserServiceClient
}

// ListSelect specifies which StorageJobs to list.
type ListSelect int32

const (
// All lists all StorageJobs and is the default.
All ListSelect = iota
// Queued lists queued StorageJobs.
Queued
// Executing lists executing StorageJobs.
Executing
// Final lists final StorageJobs.
Final
)

// ListConfig controls the behavior for listing StorageJobs.
type ListConfig struct {
// CidFilter filters StorageJobs list to the specified cid. Defaults to no filter.
CidFilter string
// Limit limits the number of StorageJobs returned. Defaults to no limit.
Limit uint64
// Ascending returns the StorageJobs ascending by time. Defaults to false, descending.
Ascending bool
// Select specifies to return StorageJobs in the specified state.
Select ListSelect
// NextPageToken sets the slug from which to start building the next page of results.
NextPageToken string
}

type summaryConfig struct {
cid string
}

// SummaryOption configures a storageJobsConfig.
type SummaryOption = func(*summaryConfig)

// WithCid filters the results to the specified data cid.
func WithCid(cid string) SummaryOption {
return func(conf *summaryConfig) {
conf.cid = cid
}
}

// WatchStorageJobsEvent represents an event for Watching a job.
type WatchStorageJobsEvent struct {
Res *userPb.WatchStorageJobsResponse
Expand All @@ -30,42 +72,37 @@ func (j *StorageJobs) StorageConfigForJob(ctx context.Context, jobID string) (*u
return j.client.StorageConfigForJob(ctx, &userPb.StorageConfigForJobRequest{JobId: jobID})
}

// Queued returns a list of queued storage jobs.
func (j *StorageJobs) Queued(ctx context.Context, cids ...string) (*userPb.QueuedStorageJobsResponse, error) {
req := &userPb.QueuedStorageJobsRequest{
Cids: cids,
}
return j.client.QueuedStorageJobs(ctx, req)
}

// Executing returns a list of executing storage jobs.
func (j *StorageJobs) Executing(ctx context.Context, cids ...string) (*userPb.ExecutingStorageJobsResponse, error) {
req := &userPb.ExecutingStorageJobsRequest{
Cids: cids,
}
return j.client.ExecutingStorageJobs(ctx, req)
}

// LatestFinal returns a list of latest final storage jobs.
func (j *StorageJobs) LatestFinal(ctx context.Context, cids ...string) (*userPb.LatestFinalStorageJobsResponse, error) {
req := &userPb.LatestFinalStorageJobsRequest{
Cids: cids,
// List lists StorageJobs according to the provided ListConfig.
func (j *StorageJobs) List(ctx context.Context, config ListConfig) (*userPb.ListStorageJobsResponse, error) {
sel := userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_UNSPECIFIED
switch config.Select {
case All:
sel = userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_ALL
case Queued:
sel = userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_QUEUED
case Executing:
sel = userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_EXECUTING
case Final:
sel = userPb.StorageJobsSelector_STORAGE_JOBS_SELECTOR_FINAL
}
return j.client.LatestFinalStorageJobs(ctx, req)
}

// LatestSuccessful returns a list of latest successful storage jobs.
func (j *StorageJobs) LatestSuccessful(ctx context.Context, cids ...string) (*userPb.LatestSuccessfulStorageJobsResponse, error) {
req := &userPb.LatestSuccessfulStorageJobsRequest{
Cids: cids,
req := &userPb.ListStorageJobsRequest{
CidFilter: config.CidFilter,
Limit: config.Limit,
Ascending: config.Ascending,
NextPageToken: config.NextPageToken,
Selector: sel,
}
return j.client.LatestSuccessfulStorageJobs(ctx, req)
return j.client.ListStorageJobs(ctx, req)
}

// Summary returns a summary of storage jobs.
func (j *StorageJobs) Summary(ctx context.Context, cids ...string) (*userPb.StorageJobsSummaryResponse, error) {
func (j *StorageJobs) Summary(ctx context.Context, opts ...SummaryOption) (*userPb.StorageJobsSummaryResponse, error) {
conf := &summaryConfig{}
for _, opt := range opts {
opt(conf)
}
req := &userPb.StorageJobsSummaryRequest{
Cids: cids,
Cid: conf.cid,
}
return j.client.StorageJobsSummary(ctx, req)
}
Expand Down
Loading

0 comments on commit b1ba783

Please sign in to comment.