diff --git a/.github/workflows/build_copier.yml b/.github/workflows/build_copier.yml new file mode 100644 index 0000000000..f8682c4d7b --- /dev/null +++ b/.github/workflows/build_copier.yml @@ -0,0 +1,106 @@ +name: copier-build +on: + workflow_run: + workflows: [ci-worker] + types: [completed] + branches: [main, release] +concurrency: + group: ${{ github.workflow }}-${{ github.event.workflow_run.head_branch }} + cancel-in-progress: true + +jobs: + info: + name: Collect information + runs-on: ubuntu-latest + if: github.event.workflow_run.conclusion != 'failure' && github.event.repository.full_name == 'reearth/reearth-cms' && (github.event.workflow_run.head_branch == 'release' || !startsWith(github.event.workflow_run.head_commit.message, 'v')) + outputs: + sha_short: ${{ steps.info.outputs.sha_short }} + new_tag: ${{ steps.info.outputs.new_tag }} + new_tag_short: ${{ steps.info.outputs.new_tag_short }} + name: ${{ steps.info.outputs.name }} + steps: + - name: checkout + uses: actions/checkout@v4 + - name: Fetch tags + run: git fetch --prune --unshallow --tags + - name: Get info + id: info + # The tag name should be retrieved lazily, as tagging may be delayed. + env: + BRANCH: ${{ github.event.workflow_run.head_branch }} + run: | + echo "sha_short=$(git rev-parse --short HEAD)" >> "$GITHUB_OUTPUT" + if [[ "$BRANCH" = "release" ]]; then + TAG=$(git tag --points-at HEAD) + if [[ ! -z "$TAG" ]]; then + echo "new_tag=$TAG" >> "$GITHUB_OUTPUT" + echo "new_tag_short=${TAG#v}" >> "$GITHUB_OUTPUT" + else + echo "name=rc" >> "$GITHUB_OUTPUT" + fi + else + echo "name=nightly" >> "$GITHUB_OUTPUT" + fi + - name: Show info + env: + SHA_SHORT: ${{ steps.info.outputs.sha_short }} + NEW_TAG: ${{ steps.info.outputs.new_tag }} + NEW_TAG_SHORT: ${{ steps.info.outputs.new_tag_short }} + NAME: ${{ steps.info.outputs.name }} + run: echo "sha_short=$SHA_SHORT, new_tag=$NEW_TAG, new_tag_short=$NEW_TAG_SHORT, name=$NAME" + + docker: + name: Build and push Docker image + runs-on: ubuntu-latest + needs: + - info + if: needs.info.outputs.name || needs.info.outputs.new_tag + env: + IMAGE_NAME: reearth/reearth-cms-copier + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Login to DockerHub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Get options + id: options + env: + TAG: ${{ needs.info.outputs.new_tag_short }} + NAME: ${{ needs.info.outputs.name }} + SHA: ${{ needs.info.outputs.sha_short }} + run: | + if [[ -n $TAG ]]; then + PLATFORMS=linux/amd64,linux/arm64 + VERSION=$TAG + TAGS=$IMAGE_NAME:$TAG + if [[ ! $TAG =~ '-' ]]; then + TAGS+=,${IMAGE_NAME}:${TAG%.*} + TAGS+=,${IMAGE_NAME}:${TAG%%.*} + TAGS+=,${IMAGE_NAME}:latest + fi + else + PLATFORMS=linux/amd64 + VERSION=$SHA + TAGS=$IMAGE_NAME:$NAME + fi + echo "platforms=$PLATFORMS" >> "$GITHUB_OUTPUT" + echo "version=$VERSION" >> "$GITHUB_OUTPUT" + echo "tags=$TAGS" >> "$GITHUB_OUTPUT" + - name: Build and push docker image + uses: docker/build-push-action@v6 + with: + context: ./worker + file: ./worker/copier.Dockerfile + platforms: ${{ steps.options.outputs.platforms }} + push: true + build-args: VERSION=${{ steps.options.outputs.version }} + tags: ${{ steps.options.outputs.tags }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/.github/workflows/build_worker.yml b/.github/workflows/build_worker.yml index d5cce74fb0..6e2d7ee30d 100644 --- a/.github/workflows/build_worker.yml +++ b/.github/workflows/build_worker.yml @@ -123,3 +123,56 @@ jobs: tags: ${{ steps.options.outputs.tags }} cache-from: type=gha cache-to: type=gha,mode=max + + docker_copier: + runs-on: ubuntu-latest + if: inputs.name || inputs.new_tag + env: + IMAGE_NAME: reearth/reearth-cms-copier + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Login to DockerHub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Get options + id: options + env: + TAG: ${{ inputs.new_tag_short }} + NAME: ${{ inputs.name }} + SHA: ${{ inputs.sha_short }} + run: | + if [[ -n $TAG ]]; then + PLATFORMS=linux/amd64,linux/arm64 + VERSION=$TAG + TAGS=$IMAGE_NAME:$TAG + if [[ ! $TAG =~ '-' ]]; then + TAGS+=,${IMAGE_NAME}:${TAG%.*} + TAGS+=,${IMAGE_NAME}:${TAG%%.*} + TAGS+=,${IMAGE_NAME}:latest + fi + else + PLATFORMS=linux/amd64 + VERSION=$SHA + TAGS=$IMAGE_NAME:$NAME + fi + echo "platforms=$PLATFORMS" >> "$GITHUB_OUTPUT" + echo "version=$VERSION" >> "$GITHUB_OUTPUT" + echo "tags=$TAGS" >> "$GITHUB_OUTPUT" + - name: Build and push docker image + uses: docker/build-push-action@v6 + with: + context: ./worker + file: ./worker/copier.Dockerfile + platforms: ${{ steps.options.outputs.platforms }} + push: true + build-args: VERSION=${{ steps.options.outputs.version }} + tags: ${{ steps.options.outputs.tags }} + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/server/e2e/integration_model_test.go b/server/e2e/integration_model_test.go index f9e687d16d..f6aff37a5d 100644 --- a/server/e2e/integration_model_test.go +++ b/server/e2e/integration_model_test.go @@ -57,6 +57,91 @@ func TestIntegrationModelGetAPI(t *testing.T) { obj.Value("lastModified").NotNull() } +// POST /models/{modelId}/copy +func TestIntegrationModelCopy(t *testing.T) { + endpoint := "/api/models/{modelId}/copy" + e := StartServer(t, &app.Config{}, true, baseSeeder) + + e.POST(endpoint, id.NewModelID()). + Expect(). + Status(http.StatusUnauthorized) + + e.POST(endpoint, id.NewModelID()). + WithHeader("authorization", "secret_abc"). + Expect(). + Status(http.StatusUnauthorized) + + e.POST(endpoint, id.NewModelID()). + WithHeader("authorization", "Bearer secret_abc"). + Expect(). + Status(http.StatusUnauthorized) + + oldModelId := mId1.String() + oldModel := e.GET("/api/models/{modelId}", oldModelId). + WithHeader("authorization", "Bearer "+secret). + Expect(). + Status(http.StatusOK). + JSON(). + Object() + + newName := "new name" + newKey := id.RandomKey().Ref().StringRef() + newModel := e.POST(endpoint, oldModelId). + WithHeader("authorization", "Bearer "+secret). + WithJSON(map[string]interface{}{ + "name": newName, + "key": newKey, + }). + Expect(). + Status(http.StatusOK). + JSON(). + Object() + + newModel. + ContainsKey("id"). + ContainsKey("projectId"). + ContainsKey("schemaId"). + ContainsKey("public"). + ContainsKey("createdAt"). + ContainsKey("updatedAt"). + ContainsKey("key") + + newModelID := newModel.Value("id").String() + newModelID.NotEqual(oldModelId) + copiedModel := e.GET("/api/models/{modelId}", newModelID.Raw()). + WithHeader("authorization", "Bearer "+secret). + Expect(). + Status(http.StatusOK). + JSON(). + Object() + copiedModel. + HasValue("id", newModelID.Raw()). + HasValue("projectId", oldModel.Value("projectId").String().Raw()). + HasValue("public", oldModel.Value("public").Boolean().Raw()). + HasValue("name", newName). + HasValue("key", newKey). + HasValue("description", oldModel.Value("description").String().Raw()) + + copiedModel.Value("schemaId").NotNull() + oldSchemaId := oldModel.Value("schemaId").String() + copiedSchemaId := copiedModel.Value("schemaId").String() + copiedSchemaId.NotEqual(oldSchemaId.Raw()) + + oldSchema := oldModel.Value("schema").Object() + copiedSchema := copiedModel.Value("schema").Object() + copiedSchema.Value("fields").Array().Length().IsEqual(oldSchema.Value("fields").Array().Length().Raw()) + copiedSchema.Value("titleField").String().IsEqual(oldSchema.Value("titleField").String().Raw()) + + copiedModel.Value("metadataSchemaId").NotNull() + oldMetadataSchemaId := oldModel.Value("metadataSchemaId").String() + copiedMetadataSchemaId := copiedModel.Value("metadataSchemaId").String() + copiedMetadataSchemaId.NotEqual(oldMetadataSchemaId.Raw()) + + oldMetadataSchema := oldModel.Value("metadataSchema").Object() + copiedMetadataSchema := copiedModel.Value("metadataSchema").Object() + copiedMetadataSchema.Value("fields").Array().Length().IsEqual(oldMetadataSchema.Value("fields").Array().Length().Raw()) +} + // PATCH /models/{modelId} func TestIntegrationModelUpdateAPI(t *testing.T) { endpoint := "/api/models/{modelId}" diff --git a/server/internal/adapter/integration/model.go b/server/internal/adapter/integration/model.go index 02b74afca7..4c4e96a7dd 100644 --- a/server/internal/adapter/integration/model.go +++ b/server/internal/adapter/integration/model.go @@ -112,6 +112,38 @@ func (s *Server) ModelGet(ctx context.Context, request ModelGetRequestObject) (M return ModelGet200JSONResponse(integrationapi.NewModel(m, sp, lastModified)), nil } +func (s *Server) CopyModel(ctx context.Context, request CopyModelRequestObject) (CopyModelResponseObject, error) { + uc := adapter.Usecases(ctx) + op := adapter.Operator(ctx) + + m, err := uc.Model.Copy(ctx, interfaces.CopyModelParam{ + ModelId: request.ModelId, + Name: request.Body.Name, + Key: request.Body.Key, + }, op) + if err != nil { + if errors.Is(err, rerror.ErrNotFound) { + return CopyModel404Response{}, err + } + return CopyModel500Response{}, err + } + + sp, err := uc.Schema.FindByModel(ctx, m.ID(), op) + if err != nil { + if errors.Is(err, rerror.ErrNotFound) { + return CopyModel404Response{}, err + } + return CopyModel500Response{}, err + } + + lastModified, err := uc.Item.LastModifiedByModel(ctx, m.ID(), op) + if err != nil && !errors.Is(err, rerror.ErrNotFound) { + return CopyModel500Response{}, err + } + + return CopyModel200JSONResponse(integrationapi.NewModel(m, sp, lastModified)), nil +} + func (s *Server) ModelGetWithProject(ctx context.Context, request ModelGetWithProjectRequestObject) (ModelGetWithProjectResponseObject, error) { uc := adapter.Usecases(ctx) op := adapter.Operator(ctx) diff --git a/server/internal/adapter/integration/server.gen.go b/server/internal/adapter/integration/server.gen.go index 3f010f7d53..048e1b8ba3 100644 --- a/server/internal/adapter/integration/server.gen.go +++ b/server/internal/adapter/integration/server.gen.go @@ -75,6 +75,9 @@ type ServerInterface interface { // Update a model. // (PATCH /models/{modelId}) ModelUpdate(ctx echo.Context, modelId ModelIdParam) error + // Copy schema and items of a selected model + // (POST /models/{modelId}/copy) + CopyModel(ctx echo.Context, modelId ModelIdParam) error // Import data under the selected model // (PUT /models/{modelId}/import) ModelImport(ctx echo.Context, modelId ModelIdParam) error @@ -507,6 +510,24 @@ func (w *ServerInterfaceWrapper) ModelUpdate(ctx echo.Context) error { return err } +// CopyModel converts echo context to params. +func (w *ServerInterfaceWrapper) CopyModel(ctx echo.Context) error { + var err error + // ------------- Path parameter "modelId" ------------- + var modelId ModelIdParam + + err = runtime.BindStyledParameterWithOptions("simple", "modelId", ctx.Param("modelId"), &modelId, runtime.BindStyledParameterOptions{ParamLocation: runtime.ParamLocationPath, Explode: false, Required: true}) + if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter modelId: %s", err)) + } + + ctx.Set(BearerAuthScopes, []string{}) + + // Invoke the callback with all the unmarshaled arguments + err = w.Handler.CopyModel(ctx, modelId) + return err +} + // ModelImport converts echo context to params. func (w *ServerInterfaceWrapper) ModelImport(ctx echo.Context) error { var err error @@ -1540,6 +1561,7 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL router.DELETE(baseURL+"/models/:modelId", wrapper.ModelDelete) router.GET(baseURL+"/models/:modelId", wrapper.ModelGet) router.PATCH(baseURL+"/models/:modelId", wrapper.ModelUpdate) + router.POST(baseURL+"/models/:modelId/copy", wrapper.CopyModel) router.PUT(baseURL+"/models/:modelId/import", wrapper.ModelImport) router.GET(baseURL+"/models/:modelId/items", wrapper.ItemFilter) router.POST(baseURL+"/models/:modelId/items", wrapper.ItemCreate) @@ -2253,6 +2275,54 @@ func (response ModelUpdate401Response) VisitModelUpdateResponse(w http.ResponseW return nil } +type CopyModelRequestObject struct { + ModelId ModelIdParam `json:"modelId"` + Body *CopyModelJSONRequestBody +} + +type CopyModelResponseObject interface { + VisitCopyModelResponse(w http.ResponseWriter) error +} + +type CopyModel200JSONResponse Model + +func (response CopyModel200JSONResponse) VisitCopyModelResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + + return json.NewEncoder(w).Encode(response) +} + +type CopyModel400Response struct { +} + +func (response CopyModel400Response) VisitCopyModelResponse(w http.ResponseWriter) error { + w.WriteHeader(400) + return nil +} + +type CopyModel401Response = UnauthorizedErrorResponse + +func (response CopyModel401Response) VisitCopyModelResponse(w http.ResponseWriter) error { + w.WriteHeader(401) + return nil +} + +type CopyModel404Response = NotFoundErrorResponse + +func (response CopyModel404Response) VisitCopyModelResponse(w http.ResponseWriter) error { + w.WriteHeader(404) + return nil +} + +type CopyModel500Response struct { +} + +func (response CopyModel500Response) VisitCopyModelResponse(w http.ResponseWriter) error { + w.WriteHeader(500) + return nil +} + type ModelImportRequestObject struct { ModelId ModelIdParam `json:"modelId"` JSONBody *ModelImportJSONRequestBody @@ -3741,6 +3811,9 @@ type StrictServerInterface interface { // Update a model. // (PATCH /models/{modelId}) ModelUpdate(ctx context.Context, request ModelUpdateRequestObject) (ModelUpdateResponseObject, error) + // Copy schema and items of a selected model + // (POST /models/{modelId}/copy) + CopyModel(ctx context.Context, request CopyModelRequestObject) (CopyModelResponseObject, error) // Import data under the selected model // (PUT /models/{modelId}/import) ModelImport(ctx context.Context, request ModelImportRequestObject) (ModelImportResponseObject, error) @@ -4289,6 +4362,37 @@ func (sh *strictHandler) ModelUpdate(ctx echo.Context, modelId ModelIdParam) err return nil } +// CopyModel operation middleware +func (sh *strictHandler) CopyModel(ctx echo.Context, modelId ModelIdParam) error { + var request CopyModelRequestObject + + request.ModelId = modelId + + var body CopyModelJSONRequestBody + if err := ctx.Bind(&body); err != nil { + return err + } + request.Body = &body + + handler := func(ctx echo.Context, request interface{}) (interface{}, error) { + return sh.ssi.CopyModel(ctx.Request().Context(), request.(CopyModelRequestObject)) + } + for _, middleware := range sh.middlewares { + handler = middleware(handler, "CopyModel") + } + + response, err := handler(ctx, request) + + if err != nil { + return err + } else if validResponse, ok := response.(CopyModelResponseObject); ok { + return validResponse.VisitCopyModelResponse(ctx.Response()) + } else if response != nil { + return fmt.Errorf("unexpected response type: %T", response) + } + return nil +} + // ModelImport operation middleware func (sh *strictHandler) ModelImport(ctx echo.Context, modelId ModelIdParam) error { var request ModelImportRequestObject @@ -5183,79 +5287,79 @@ func (sh *strictHandler) ProjectFilter(ctx echo.Context, workspaceId WorkspaceId // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+w9W2/bONZ/RdA3j2qc2c6+5C2bNIPMTptgk+7gQ1EUtHRscyOTLknlsoH/+4I3XSxK", - "omw5sRO/tLFEUofnfhP1HMZ0vqAEiODhyXO4QAzNQQBTvxDnIC6Ta3lR/k6AxwwvBKYkPAkvzwM6CcQM", - "Ag4pxAKSQE0IoxDL+wskZmEUEjSH8MSuFUYhg58ZZpCEJ4JlEIU8nsEcyfXF00IO5YJhMg2j8PHDlH4w", - "F3FydKqWOA+Xy0gv1wDYzQJiPMHAg4cZiBkwDVeQIIECxCCA+RiSBJIAEwU/A56lglvAf2bAnlYgD8tw", - "/sJgEp6E/zcqkDfSd/lIjf6kHiA3IWGN6XwOpBcizRQ3KvP1NkHmmVlEo3OCIU0ukyv2T3hqgZIFd/Bk", - "gVVzLArnNIGUB+bxTrDLz1gbcj3q6EKtda7XkhvAAuZ9ECzHu8HUK22C2ku5gsbrHTw9UNYEl7kb5Au5", - "2M8MCpsBkA9S+O9JQDXHEnDB6H8gbuC48uprY0YtclQmmlm2k2q9Ad2Eep/VEpp8CzSFBui+ckgCQQ1H", - "acjQFBqIaG4VQCQwQVkqwpNfo3COCZ5nc/W3hYMImALTQAC7HgwOvZYblL8fR+EcPRpYjo+7IdOkkIxx", - "mmLEWxkPyRGWoq1EXF12bWqahRTP6ZUqUPtrCz9wW+Fc4bJrM0nzGYOJH3lRwGAisXkPrIHE0jY5yRum", - "SACXmwAiafqtuLDIximOw++RQ7PolXywpQZWDIIbYXbFTaT0Rq+h0ccpE+eYdaAwgQkmoICjLAEWJJhB", - "LAfZHTDgC0o4BCnmIgoecJoGYwjwlFAmbcakNBnzgFARLBhwIAKSBmokmDVQQwJZogVSv9RFNxkoE303", - "6NpWA5xy+QZAYwZIQHJa5pzytWyRmL+dgD9QdscXKIY+ApdPcnNQaU1voUNxTDMiEjpHmBz9la8gWUiJ", - "oEaScny/UHFBM5J8YoyyOsC3Cqk/M+ASVgacZiyG4AFpnpjIqeEyCr8SlIkZZfi/0LTUaRwD54Ggd0Ak", - "T80x55hMpYhjco9SnJSEUMF2AUhkDJS3zugCmMAa6CnQOQj21OWh/m7HSbcp6eHPRCsPNCPoWOnGpSX+", - "c84lFlQnX9Rmm9FnNE21WNa3ONFD1N/ST+Nde7UQFM9DjKGnFmBLj/cD+3egf9xcfdkbYHMeqUIbU8oS", - "TKRFkD8pgatJePKtHeJriolct33U5ywV2G/on5jAjYHfZ9Ue469p+jSlxBdaM/j7MrKChXuQsixjXbTU", - "mInCEpqisLQxc6dyxcKXz7I/7YN7c0Zped9NWpJKN/FST/hbfburwPuuXiGte1UNQG9wG9bSKPRfzbJT", - "bb06WBPK5kgZfZqNU2nUzBySzcfSmVaOt8Hhxw6EuiDdDAHF436r39Tpj5q+QCye4Xv49CgYUnx2I5DI", - "eJmxF0ASG9f+WDA6ZcClM59QIlEwQTiFxMGeURhTIoCIWyMp9fu5+1FBLhLwQeB5Cb/FlAlOoQtBaoyv", - "VcyzUdYrccC5YHCP4eF2ReLx3ERo8v8f/F6uPgWq//3xMflxi1Pg5uf8XuoD5U7/+CjdnZjfS6+L3BH6", - "QJzoKyKS7m2UApEoFFSg9Ab/t7ybgkULR88b6xlL3fmKwmf7JtEdVaIoOSvq9DEL3bWScythGqVyJekW", - "KoZLOTTwm0631blc+W/diEREy4oavkrujJtgTcCUCeTWyTnTD8XwXkxczQLWEBtTkmC3K4ZI4q14imUc", - "ymeMOI4d3pPOFnaLLKTJjQobqGJSuQYS2tW2BICfGUqlPBEqPum/XQS4R2kmCedExZjSdKegtHckYIBI", - "TaosaKWH2ckuGZpLI7hIYbt7xCROswT4KXnSG72sXMhvK7Et307TdmRYPqwx2GZYIVmaovG2sQLzhTD4", - "+KT+9HPZjGbeKmhTpXjY7QxJ7zIFzs2fpRtXTLHrLS2NKK758LC1MZsRS0O+uUbiuaO6PbxKZY8wMeJ+", - "VvziAjHB/8Iq3QEksX8SKm7KtySv2Ls+KG6wvT1RrIzNVhEzhgll0qChiVBmU1+4YlfEXjR/08ntDPO/", - "AO7yH58pUcjRv/4fEGvHjY8l3QRhLqlVCziyN4xmC89kzO9yrPbYvKy8qZaFuiDldDBsUNpGP7VL5dx0", - "Gcsqpdv4xR/y1bBZeY3KL8KUnCMBpZ9ftcc1pwme4Lg8onzJjOI6cLGUicI5CKQe7KmHbWixklCZ4TRh", - "4B9R2uhjVR11BUPN0QcSM+cN7vbwXXvTafz65vr7o5W85/Oa/mpenGvm5BRx8VlRGRJ/6CTNEyTQjVeR", - "32SYa/O8eLooXbRGjmuGcKaU4/APfTsYis3x9TbVO050MZ4tudUDD1XIGyhIGoQpK/hvpOga0XO5zNEV", - "gLZUNxzYLTihitxbLFK4sBbKXzmvlZCBNPFPXun/NWgOJYn7MumaAtaMzAu3XR/KQJfjs7pkFx6K625P", - "4968R3eh4xfXHpdR+Etjw0q34K3IfKK9dZReV5/cyTAS4tKcpdOnFSm0OkXtHqG6W4H4eysCq1uobrQL", - "LVa0XKrKX4oUGXXO+U8g04qXkPd3lHpB/BLYtlfEa/QgSHfhueDjkq8o4FFIKOBRnDJAYRQyHM9u9dU5", - "YncJfZAhRTyD+G5MH8Mo7/pLtN+o0j9RqAvUNpmn3EezJ9XEAQyIqlnrDKb26aNQIJPhVaWZq7HpJbEX", - "PiVYusnOSAUYx5RAIt3+Qdywnmp3spHCLSrHmH82npJbRVk/6mIg8GwjmNsvZrbftV6hyTIVWDSwZfGA", - "nNrJZa8iTJWi7oWrq3WCsoZnYaDw2PnSFdRyiDOGxZMyqJoVx4AYsNNMKxO1W0VidblYdibEQjdcYDKh", - "9X6If8EnxMTsw9nnm+BSJcxVqBacXl+GudboGJVvLvz16Pjo2GQaCFrg8CT8eHR89DHUwZECXHfs8tGz", - "6VBeaqBSEEqF6CgfUyKZKVRFn3N9c6Vn5G/Hx7qOnmfx0WKRmlBz9B+usd3kePUrOTmIsmpTtQLjuudn", - "GYW/afBWOm90i4ltZgny9u9Ah/dq3q9NLJ1vf1RvdFEzf6s/8UvRHyP5KJvPEXtS3VASp3kDuUBTLhW3", - "2jEPdRVeNNDjdzVlI2J0dnTvP4anINrQW+78b+j6KIaMKm8GqCaJmhiNTFHN9CY1Ec9UoP7UrWkDSlT5", - "8Z45YV0EdJUQ/KTNtu3vOk8Y7a0IXdbb374vvztZJt9YjXeKOxszURQuKO9gkzPl8JieP+DiHzR52ohH", - "mkqubppXOw2XW1Q6OTPWWW3/+Uq7rX1Yq1XBjJ7zN2K6jbdhpFez4a0V9zqxrV0kVWztPw+UDf5LqJeo", - "c/zKa1pKISERz9oZ6esiefcaSeMgOH1zTGo3VqJ3p5pSvsboWb9G1qqPZAT4anqo9JJaDyWETdD6NnQP", - "sa8BWorqUL4UbawGpyJjhNuJR3kRuExRHY30U1X5+z8eaqr05qvc1dbkfSVRUeeK011mhyj8uxsmAYyg", - "NODA7oEFoNfrwzwOJqizTz/yl19erZgdp5pt5b6BzVHe/NvnlefhkozDZgVf24IeJKrVyrYIVN2udmcX", - "5Nw3klyQT3iTuQVN8dJpBxXCb+L811SqM7VQ4pFDZuEtZRb8GatFtfjmFUpctH9phQqe3opn/yJaZciM", - "QomFDgmFckLhbbGn2ZekdnDmp5v0wTqjZ1PQblVEqjvy1VRQ+eAUbwVkzol4BcqukzDIT7WwJFN79skY", - "6Jn1mE0tsOUCpkGxI94I/ri5+hIoTzSgkyDjwAKC5rpR6l0G9QWdHCTuZywqpxt5hPWtLDKwVejqM2vq", - "Qmzobn1tU9LF4RoqyeJ7oW7qHFFjRpdpGOH5gjKxeo7fOpyaiQbrcqkfMWh+6bLn69ZFE2T+lhrQP7g6", - "/0A90NVJZ5vtVCrpn41NtgIJuFltWy230AuGBEyfqi81cmDFW8rqD3Xle9fLRMWhiGZPpQc43ybSXcCI", - "iZGc8MH20zWh176Zkvd4jTFB6tifls7Sd4bUwRXViuukD486o1nFCS+12eqdtg+RnmHL/cZux7YXaAg8", - "XAzag2/6EBvh9PELawpb67TAvKT1Km7JtpwOrUv1saQZSYBVz8Bq9DXdut8SsMMPTTHXeFXBxgMWs2CC", - "UwGSXQJEEn10GCZTd33hQo3tXeAqTi/zCJsrx7l5jC9OafQZXD5N0WP8uqW57tGVs0F1KW8Ik6qp2eMF", - "aJdYDqwDh+2QVudbnjy3nk6ZH5vZPVAdepIrrXzscbSJAjvEVE06Z4iCad1hbU7vD5vXP1Q216ts7pRU", - "rF9RaKhNuq3xUczvPSzy2c2/AzFDIpihuoFGPLDnCLoNMj/lZzf/7m2QX8hmdjeoCHgUI4Oogtk6AxYX", - "i+l7ASYKpWaJd6t0+7BVNQNdnHYsOWtj7dwiIFOgVtF0CIk5ZXMzQbFHde6tsKyvoe3WnYJjkVu8Rfk+", - "RaYvk1VNQXEU7HZExroMP8w3EDwlx06z53MjHsiZwfjJnKt/eV5POVdOtfjHkwo9T7kRnq0xafnt5I7U", - "wDsuUHjRM08aVM81icItMmg/vuzBjgc23D029OK+LXCdcU746Hn1MxFLw5E9kmB6QkPJLU91DZgJKSD0", - "CvfyatUhBfJGUiAFx21cX3Z/fWW7+dLGVIvaxcC5lkOReh8SI+11im51nTsQ6rtQK/1N1Z2fV9pwGtS2", - "HvQXFrPrPI7d6Vao2+ILV8l7U5B1ig7eWTUgJxyarHauyWptI1j/YN4wLVqr7HYwhPtoCHfhdaiO7q/e", - "lnVU1K5eV8acDqQqhGkHchsitJPHKr6s2FW6dzyEL6817oUXag9vtrJiCqubycroufzR2lbf1Bq30pdx", - "k7qhUFDtgoeanzjqF9DbHb1b/1Qh4MjFX6/psXRPqn/YueUtJLWn7bkxBx38NnVwZh2WgXXwi7Y0Vhn+", - "0N243YNHDv2Bh3SAb39g3o/y+tkBE7lUUXGWd4hdNh7Lsr3A5tCQ+M4aEuvs1iQtG1jdl2ldLMnDoYvx", - "0MW4W12Mg9qOTUTxRZskKyJ56Jc89EturV+yJKDr903ugJAO35ZpnqxC134tmhXpPbTJ7Xa3ZgOZh+7c", - "3AER2bQx1EsgDoKwZ/2iHfy/V3yvd6cPY/Bi76MG/j00nQ6UVzP8pifw9y53RzXBEmhPWk57v0jvK6uj", - "Z/t902EtlLnbYqIuzw/2ab/sU5mmr26gLNt2MPzSfDGlRwXNfNGqVwlNHcz07o4FcRze8UrlMPN5sj0z", - "13vx7bT1K1l6e0f1j4UMUMvyrUcFBB7MB1mkBDMFpOpPMp8N1TcbJHrgdzn4HV6cg9wZA24/PZnABGWp", - "CE8mKOUQhSRLUzROQdeHIle7B70Ddw9sxlK/VtdtHqLmtcvVXW3j/J0Nvme4/+JYVMhyCWiVxA77OcoW", - "KUWmcdYpcZecZ1Lgvv7rTyVqKFB8Ggga6Ln597sahO2rGpWL3MaKYdCjo9s+ER1njFO2afP5cuBvM3SD", - "rYfcuj81HYUEHt0f2R5A/zR8OlIzyls44rrG8K2C54rG1m1UX3WMPXrND82Nh+bGARrMm7m4tYW8sTl8", - "9zvC95GWSaWbe4hm7hWNs71+7IOeOuipAZqwt5H79Ml3HpKcO5rk3EZi05WffH6g7I4vUAyS5Wy01SM1", - "mU9ZZTKTRt9G4WzgDFp5116pPespO5J72y2e5ZC+pqS0z/xCxYUUnK2f0t3MiuXY5tpirL+4lCTjRY5w", - "WS6X/wsAAP//hHQYUfexAAA=", + "H4sIAAAAAAAC/+xd23LbONJ+FRb/uWQsz2b2xndeO5ny7GTiWjs79ddUKgWTLQlrCmAA0I7WpXffwokH", + "ESRBibIlWzeJRQJgo/vrAxpN8CmM6SKjBIjg4dlTmCGGFiCAqV+IcxBXybW8KH8nwGOGM4EpCc/Cq8uA", + "TgMxh4BDCrGAJFAdwijE8n6GxDyMQoIWEJ7ZscIoZPA9xwyS8EywHKKQx3NYIDm+WGayKRcMk1kYhT/e", + "zeg7cxEnJ+dqiMtwtYr0cC2E3WQQ4ykGHjzOQcyBabqCBAkUIAYBLO4gSSAJMFH0M+B5Krgl/HsObLlG", + "eVil8ycG0/As/L9JybyJvssnqvUH9QA5CUlrTBcLIIMYabq4WVmMtw0zL8wgmp1TDGlylXxm/4RlB5Us", + "uIelJVb1sSxc0ARSHpjHO8muPmNjynWrk49qrEs9lpwAFrAYwmDZ3k2mHmkb1l7JETRf72H5SFkbXeZu", + "UAzkgp9pFLYTIB+k+D9QgKqPFWDG6H8gbkFcdfSNOaMGOakKzQzbK7XBhG4jvU9qCC2+DM2ghbovHJJA", + "UIMoTRmaQYsQza2SiASmKE9FePZzFC4wwYt8of62dBABM2CaCGDXo9Ghx3KT8vfTKFygH4aW09N+yrQo", + "JDDOU4x4J/CQbGEl2inE9WE3lqYZSGFOj1Sj2t9a+JHbSecayq5NJ40zBlM/8aKAwVRy8wFYi4ilb3KK", + "N0yRAC4nAUTK9K/yQpbfpTgOv0YOy6JH8uGWalhzCG6G2RG30dIbPYZmH6dMXGLWw8IEppiAIo6yBFiQ", + "YAaxbGRnwIBnlHAIUsxFFDziNA3uIMAzQpn0GdNKZ8wDQkWQMeBABCQt0kgwa5GGJLIiC6R+qYtuMVAm", + "hk7QNa0WOuXwLYTGDJCA5LyKnOq1PEvM307CHym75xmKYYjCFZ3cCKqM6a10KI5pTkRCFwiTkz+LESSE", + "lApqJqnA9w8qPtKcJB8Yo6xJ8K1i6vccuKSVAac5iyF4RBoTU9k1XEXhF4JyMacM/xfahjqPY+A8EPQe", + "iMTUAnOOyUyqOCYPKMVJRQkVbR8BiZyBitYZzYAJrImeAV2AYMu+CPVX206GTcmAeCZae6BpQe+UbVxZ", + "4T8VKLGkOnHR6G1aX9A01WrZnOJUN1F/yziN983VUlA+DzGGlh3EVh7vR/avQH+7+fzHwRBbYKRObUwp", + "SzCRHkH+pAQ+T8Ozv7opvqaYyHG7W33KU4H9mv6OCdwY+n1GHdD+mqbLGSW+1JrGX1eRVSw8QJRVHeuT", + "peZMFFbYFIWViZk7tSuWvqKX/WkfPBgZleF9J2lFKsPEK93hb83prhPvO3pNtO5RNQGDyW0ZS7PQfzQL", + "p8Z4TbKmlC2Qcvo0v0ulUzN9SL64k8G0CrwND9/3MNRF6XYMKB/3S/OmTn807AVi8Rw/wIcfgiGFsxuB", + "RM6rwM6AJHZd+y1jdMaAy2A+oUSyYIpwCokDnlEYUyKAiFujKc37RfhRYy4S8E7gRYW/ZZcpTqGPQaqN", + "r1csslE2KnHQmTF4wPB4u6bxeGFWaPL/b/xBjj4Dqv/99j75dotT4Obn4kHaAxVOf3svw52YP8ioi9wT", + "+kic7CtXJP3TqCxEolBQgdIb/N/qbEqIloGeN9dzlrrzFWXM9pdkd1RbRcleUW+MWdqutZxbhdMolSPJ", + "sFABLuXQgjedbmuiXMVv/YxEROuKar4u7pybxZqAGRPIbZML0I8FeC8Q17OADcbGlCTYHYohkngbnnIY", + "h/G5QxzHjuhJZwv7VRbS5EYtG6gCqRwDCR1qWwHA9xylUp8IFR/03y4BPKA0l4JzsuKO0nSvqLR3JGGA", + "SEOrLGmVh9nOLh1aSCeYpbDbOWISp3kC/Jws9USvaheK20ptq7fTtJsZFocNgG3HFZKnKbrbNVdgkQnD", + "jw/qT7+QzVjmnZI2U4aH3c6RjC5T4Nz8WbnxmSm43tJKi/KaD4atj9lOWJry7S0SLwLV3fFVGnuEiVH3", + "i/IXF4gJ/idW6Q4gif2TUHFTvSWxYu/6sLjF9w5ksXI2O2XMHUwpkw4NTYVym/rCZ/aZ2Ivmbzq9nWP+", + "J8B98eMTJYo5+tf/A2LdvPHxpNswzKW1agBH9obRPPNMxvwq2+qIzcvLm92yUG9IOQMMuyjtkp+apQpu", + "+pxlXdJdePGnfH3ZrKJGFRdhSi6RgMrPLzriWtAET3FcbVG9ZFpxvXCxkonCBQikHuxph+3SYi2hMsdp", + "wsB/RWlXH+vmqG8x1L76QGLuvMHdEb5rbjqN35zc8Hi0lvd82jBeLTbn2pGcIi4+KSlD4k+dlHmCBLrx", + "2uQ3GeZGPy9Ml1sXnSvHDZdwZivHER/6VjCUk+ObTWrwOtEFPLvl1lx4qI28kRZJo4Cyxv9WiW6weq5u", + "c/QtQDt2NxzcLZGwtV4rq+mfhNL/a/PqMHZ4KNg2z3VgkcJH64x9/VA7Mz+6/fpYDrq6PmtqdhmhuO4O", + "dO7tc3RvdPzkmuMqCn9qLVjpV7w1nU90tI7S6/qTe4EmKa70WTljWpFCZ1DUHRGquzWKv3YysD6F+kT7", + "2GJV0mWq/LVPiVHnnH8HMqtFCUV9R6UWxC+BbWtFvFqPwnQXn0scV2JFAT+EpAJ+iHMGKIxChuP5rb66", + "QOw+oY9ySRHPIb6/oz/CqKj6S3TcqNI/Uag3qG0yT4WPZk6qiAMYELVnrTOYOqaPQoFMhldtzXy+M7Uk", + "9sKHBMsw2blSAcYxJZDIsP8lzPV0K0Nd7hxj/slESm4TZeOojyORZwvB3HExs/WuzR2aPFcLixZYlg8o", + "pJ1cDdqEqUvUPXB9tF5SNogsDBUeM1+5FrUc4pxhsVSOWEPxDhADdp5rY6Jmq0SsLpfDzoXIdMEFJlPa", + "rIf4F3xATMzfXXy6Ca5Uwlwt1YLz66uwsBo9rYrJhT+fnJ6cmkwDQRkOz8L3J6cn70O9OFKE64pdPnky", + "FcorTVQKQpkQvcrHlEgwhWrT51LfXKsZ+dvpqd5HL7L4KMtSs9Sc/IdrbrcFXsO2nBxCWfep2oBxXfOz", + "isJfNHlrlTe6xMQWswRF+Xegl/eq389tkC6mP2kWuqievzSf+EdZHyNxlC8WiC1VNZTkaVFALtCMS8Ot", + "ZsxDvQsvWuTxq+qylTB6K7oPn8MzEF3srVb+t1R9lE0mtTcDVJFEQ40mZlPN1Ca1Cc/sQP2uS9NG1Kjq", + "4z1zwnoT0LWF4Kdttmx/3zFhrLcSdNVu//V19dUJmWJiDeyUd7YGURRmlPfA5EIFPKbmD7j4B02WW2Gk", + "bcvVLfN6peFqh0anAGMTaoePKx22DoFWp4GZPBVvxPQ7bwOkF/PhnTvuTWFbv0jq3Dp8DFQd/nOYl6i3", + "/dprWsogIRHPu4H0JUvevEXSPAjOXx1I7cQq8u41UyrWmDzp18g67ZFcAb6YHaq8pDbACGGzaH0dtofY", + "1wCtRPVSvrLaWF+cipwRbjueFJvAVYnq1cgwU1W8/+NhpipvvspZ7Uzf1xIVTVSc7zMcovDvbpoEMILS", + "gAN7ABaAHm8IeBwgaMJnmPirL6/W3I7TzHaib2R3VBT/Dnnlebwk47hZwZf2oEeN6vSyHQrV9Kv92QXZ", + "95UkF+QTXmVuQUu8ctpBTfDbBP8Nk+pMLVQwcswsvKbMgj+wOkyLb16hgqLDSyvU+PRaIvtnsSpjZhQq", + "EDomFKoJhdcFTzMvKe3gws826YN1Jk9mQ7vTEKnqyBczQdWDU7wNkDkn4gUku0nCoDjVwopMzdknY6B7", + "NtdsaoAdb2AaFjvWG8FvN5//CFQkGtBpkHNgAUELXSj1Jhf1pZwcIh7mLGqnG3ks6zshMrJX6Ksza6tC", + "bKlufWlX0odwTZWE+EGYmyYiGmB0uYZJTLPl+il+m+DUuVa6oNnykzF/44BwBJDtB6iKfNOLmczunvXj", + "bHZoRiVG7DlQiOhT9tSZX6g82qfVhTohjRcZZWIMUOeiJWC60o8YNWV6NfAEgbKut3jxEuhvXB3poR7o", + "Kg619aMqO/rP1rpxgQTcrFdiV98KEQwJmC3r7+lyYOWL9+oPdeVr3/tx5TmfZk6VBzhfkNOF7YiJiezw", + "zpaItrHXvmxVlC3eYYLUSVYdxdJvjKmj+9611YA+D+2C5rV1ZaVyXM+0u4k0DR33Wwt4u94JI/D4cdTX", + "UUxpbSudPkudhrvQNi0w7x2+iNvYlQPQtlSftJuTBFj9WLeBtt8KsGdplWKu+aq8zSMW82CKUwESLsoN", + "qdPwMJm5t8w+qraD92zLA/k8MkG1Ewo92pcHj/o0rh4Q6tF+093m/ta142717vQYLlVLc8A7/bsIFdds", + "4LhF/+rI1rOnzgNXi5Ng+xuqc3wKo1W0PY22MWDHNEGbzRmjBsBzFaYSxeNuVR036zfbrN8rrdh8k6xl", + "u93tjU9i/uDhkS9u/h2IORLBHDUdNOKBPRrT7ZD5Ob+4+fdgh/xMPrO/5krADzExjCrB1rtgcUFM3wsw", + "USw1Q7xZozsEVvVNlfIAb4msra1zh4LMgFpD06Mk5uDY7RTFnj57sMqyuYW2U3cqjmVu+WLw21SZoSCr", + "u4LydOPdqIwNGb6Zz3p4ao7tVqQaeSB7BndL86mIq8vmLkrtoJZ/6FT2OTfKszOQVl+470kNvOE9Ny95", + "FkmD+lE9UbhDgA7D5QA4HmG4fzD0Qt8OUGeCEz55Wv/yycogckASTHdo2UUuUl0jZkJKCr2We8UG7DEF", + "8kpSICXiti6ZcH9QaLf50tZUi5rFyLmWY93FISRGuvcp+s11EUCoT52tlezVZ35ZqyxrMdu60Z9YzK+L", + "dexeV/fdlh9tS96agWxKdPRiwRGRcKwb3Lu6wY2dYPMbkONUHa7D7egID9ER7sMbfj0FjYM966Tcu3pZ", + "HXMGkGojTAeQu1ChvTwp9HnVrla9s8+FmhtFofY8cqsrZmN1O12ZPFW/w9wZm1rnVvnYc9J0FIqqfYhQ", + "O87odUHDzujNxqeKAScufL1kxNLfqfmt8o4X69ScdhfGHG3w67TBuQ1YRrbBz1rSWAf8sbpxt2fpHOsD", + "j+kA3/rAoh7l5bMDZuVSZ8VFUSF21XrS0O4WNseCxDdWkNiEW5u2bOF1n6d0saIPxyrGYxXjflUxjuo7", + "tlHFZy2SrKnksV7yWC+5s3rJioJuXje5B0o6flmmebJaug4r0axp77FMbr+rNVvEPHbl5h6oyLaFoV4K", + "cVSEA6sX7cH/QeFez04fxuAF75MW/B6LTkfKqxm86Q78revdSUOxBDqQktPBL9L76urkyX6yd1wPZe52", + "uKiry6N/Oiz/VJXpizsoC9sewK/MR4AG7KCZj7QN2kJTBzO9uWNBHId3vNB2mPni3oG564P4HODmO1l6", + "eifN79+MsJflux8VEHg03xiSGswUkao+yXwJV99s0eiR3+Xg9zi7BDkzBtx+TTWBKcpTEZ5NUcohCkme", + "puguBb0/FLnKPeg9uGtgc5b6lbru8hA1r1muz+q5j2rs+UTn4atjuUNWaECnJvb4z0mepRSZwlmnxl1x", + "nkuF+/Kv35WqoUDhNBA00H2LT9K1KNsX1apQua0Nw6inoXd99TzOGafspQ8mHU62bnLr/np6FBL44f5u", + "/Aj2p+VrqBoor+HU9gbgOxXPtRrbtFB9PTD2qDU/FjceixtHKDBvR3FnCXlrcfj+V4QfoiyTWjX3GMXc", + "axZnd/XYRzt1tFMjFGHvIvfpk+88Jjn3NMm5i8SmKz/59EjZPc9QDBJydrU1IDVZdFkHmUmj72LjbOQM", + "WnXWXqk9Gyk7knu73TwrKD1+pqEDitW1zbXl2HB1qWjGsxzhslqt/hcAAP//Lbuqn8q0AAA=", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/server/internal/infrastructure/gcp/config.go b/server/internal/infrastructure/gcp/config.go index 777bd64d27..47ab5ea84e 100644 --- a/server/internal/infrastructure/gcp/config.go +++ b/server/internal/infrastructure/gcp/config.go @@ -11,4 +11,5 @@ type TaskConfig struct { DecompressorGzipExt string `default:"gml"` DecompressorMachineType string `default:"E2_HIGHCPU_8"` DecompressorDiskSideGb int64 `default:"2000"` + CopierImage string `default:"reearth/reearth-cms-copier"` } diff --git a/server/internal/infrastructure/gcp/taskrunner.go b/server/internal/infrastructure/gcp/taskrunner.go index c9e3244217..0ae8c6cf8b 100644 --- a/server/internal/infrastructure/gcp/taskrunner.go +++ b/server/internal/infrastructure/gcp/taskrunner.go @@ -71,6 +71,13 @@ func (t *TaskRunner) Retry(ctx context.Context, id string) error { } func (t *TaskRunner) runCloudBuild(ctx context.Context, p task.Payload) error { + if p.DecompressAsset != nil { + return decompressAsset(ctx, p, t.conf) + } + return copy(ctx, p, t.conf) +} + +func decompressAsset(ctx context.Context, p task.Payload, conf *TaskConfig) error { if p.DecompressAsset == nil { return nil } @@ -80,25 +87,25 @@ func (t *TaskRunner) runCloudBuild(ctx context.Context, p task.Payload) error { return rerror.ErrInternalBy(err) } - src, err := url.JoinPath("gs://"+t.conf.GCSBucket, "assets", p.DecompressAsset.Path) + src, err := url.JoinPath("gs://"+conf.GCSBucket, "assets", p.DecompressAsset.Path) if err != nil { return rerror.ErrInternalBy(err) } - dest, err := url.JoinPath("gs://"+t.conf.GCSBucket, "assets", path.Dir(p.DecompressAsset.Path)) + dest, err := url.JoinPath("gs://"+conf.GCSBucket, "assets", path.Dir(p.DecompressAsset.Path)) if err != nil { return rerror.ErrInternalBy(err) } - project := t.conf.GCPProject - region := t.conf.GCPRegion + project := conf.GCPProject + region := conf.GCPRegion machineType := "" - if v := t.conf.DecompressorMachineType; v != "" && v != "default" { + if v := conf.DecompressorMachineType; v != "" && v != "default" { machineType = v } var diskSizeGb int64 - if v := t.conf.DecompressorDiskSideGb; v > 0 { + if v := conf.DecompressorDiskSideGb; v > 0 { diskSizeGb = v } else { diskSizeGb = defaultDiskSizeGb @@ -109,11 +116,11 @@ func (t *TaskRunner) runCloudBuild(ctx context.Context, p task.Payload) error { QueueTtl: "86400s", // 1 day Steps: []*cloudbuild.BuildStep{ { - Name: t.conf.DecompressorImage, - Args: []string{"-v", "-n=192", "-gc=5000", "-chunk=1m", "-disk-limit=20g", "-gzip-ext=" + t.conf.DecompressorGzipExt, "-skip-top", "-old-windows", src, dest}, + Name: conf.DecompressorImage, + Args: []string{"-v", "-n=192", "-gc=5000", "-chunk=1m", "-disk-limit=20g", "-gzip-ext=" + conf.DecompressorGzipExt, "-skip-top", "-old-windows", src, dest}, Env: []string{ "GOOGLE_CLOUD_PROJECT=" + project, - "REEARTH_CMS_DECOMPRESSOR_TOPIC=" + t.conf.DecompressorTopic, + "REEARTH_CMS_DECOMPRESSOR_TOPIC=" + conf.DecompressorTopic, "REEARTH_CMS_DECOMPRESSOR_ASSET_ID=" + p.DecompressAsset.AssetID, }, }, @@ -137,6 +144,59 @@ func (t *TaskRunner) runCloudBuild(ctx context.Context, p task.Payload) error { return nil } +func copy(ctx context.Context, p task.Payload, conf *TaskConfig) error { + if !p.Copy.Validate() { + return nil + } + + cb, err := cloudbuild.NewService(ctx) + if err != nil { + return rerror.ErrInternalBy(err) + } + + project := conf.GCPProject + region := conf.GCPRegion + + build := &cloudbuild.Build{ + Timeout: "86400s", // 1 day + QueueTtl: "86400s", // 1 day + Steps: []*cloudbuild.BuildStep{ + { + Name: conf.CopierImage, + Args: []string{ + "-v", // Enables verbose mode for logging. + "-n=192", // Specifies a numerical configuration, possibly a limit or count (e.g., number of threads, requests, etc.). + "-gc=5000", // Configures garbage collection or memory management to a threshold of 5000 units. + "-chunk=1m", // Sets a chunk size of 1 megabyte for processing or data transfer. + "-disk-limit=20g", // Limits the disk usage to 20 gigabytes. + "-skip-top", // Enables an option to skip certain data or processing steps (e.g., skipping the "top" of a hierarchy). + "-old-windows", // Activates compatibility or a specific mode for older Windows environments. + }, + Env: []string{ + "REEARTH_CMS_COPIER_COLLECTION=" + p.Copy.Collection, + "REEARTH_CMS_COPIER_FILTER=" + p.Copy.Filter, + "REEARTH_CMS_COPIER_CHANGES=" + p.Copy.Changes, + }, + }, + }, + Options: &cloudbuild.BuildOptions{ + DiskSizeGb: defaultDiskSizeGb, + }, + } + + if region != "" { + call := cb.Projects.Locations.Builds.Create(path.Join("projects", project, "locations", region), build) + _, err = call.Do() + } else { + call := cb.Projects.Builds.Create(project, build) + _, err = call.Do() + } + if err != nil { + return rerror.ErrInternalBy(err) + } + return nil +} + func (t *TaskRunner) runPubSub(ctx context.Context, p task.Payload) error { if p.Webhook == nil { return nil diff --git a/server/internal/infrastructure/gcp/taskrunner_test.go b/server/internal/infrastructure/gcp/taskrunner_test.go index bce3571f83..47ecf3c9d4 100644 --- a/server/internal/infrastructure/gcp/taskrunner_test.go +++ b/server/internal/infrastructure/gcp/taskrunner_test.go @@ -14,7 +14,8 @@ func TestTaskRunner(t *testing.T) { gcsBucket := "" gcpProject := "" gcpRegion := "" - image := "" + decompressorImage := "" + copierImage := "" if assetID == "" || path == "" || gcsBucket == "" || gcpProject == "" || gcpRegion == "" { t.Skip("assetID, path, gcsBucket, gcpProject, gcpRegion must be set") @@ -32,9 +33,10 @@ func TestTaskRunner(t *testing.T) { GCPProject: gcpProject, GCPRegion: gcpRegion, GCSBucket: gcsBucket, - DecompressorImage: image, + DecompressorImage: decompressorImage, DecompressorTopic: "decompress", DecompressorGzipExt: "gml", + CopierImage: copierImage, }) require.NoError(t, err) diff --git a/server/internal/usecase/interactor/model.go b/server/internal/usecase/interactor/model.go index 11ff9adbd5..1ff8983477 100644 --- a/server/internal/usecase/interactor/model.go +++ b/server/internal/usecase/interactor/model.go @@ -2,7 +2,9 @@ package interactor import ( "context" + "encoding/json" "errors" + "fmt" "github.com/reearth/reearth-cms/server/internal/usecase" "github.com/reearth/reearth-cms/server/internal/usecase/gateway" @@ -11,10 +13,13 @@ import ( "github.com/reearth/reearth-cms/server/pkg/id" "github.com/reearth/reearth-cms/server/pkg/model" "github.com/reearth/reearth-cms/server/pkg/schema" + "github.com/reearth/reearth-cms/server/pkg/task" "github.com/reearth/reearthx/i18n" + "github.com/reearth/reearthx/log" "github.com/reearth/reearthx/rerror" "github.com/reearth/reearthx/usecasex" "github.com/samber/lo" + "go.mongodb.org/mongo-driver/bson" ) type Model struct { @@ -313,3 +318,146 @@ func (i Model) UpdateOrder(ctx context.Context, ids id.ModelIDList, operator *us return ordered, nil }) } + +func (i Model) Copy(ctx context.Context, params interfaces.CopyModelParam, operator *usecase.Operator) (*model.Model, error) { + return Run1(ctx, operator, i.repos, Usecase().Transaction(), + func(ctx context.Context) (*model.Model, error) { + // copy model + oldModel, newModel, err := i.copyModel(ctx, params, operator) + if err != nil { + return nil, err + } + // copy schema + if err := i.copySchema(ctx, oldModel.Schema(), newModel.Schema()); err != nil { + return nil, err + } + // copy items + if err := i.copyItems(ctx, oldModel.Schema(), newModel.Schema(), newModel.ID()); err != nil { + return nil, err + } + // copy metadata + if oldModel.Metadata() != nil { + // copy meta schema + newMetaSchema, err := i.copyMetaSchema(ctx, *oldModel.Metadata(), newModel) + if err != nil { + return nil, err + } + // copy meta items + if err := i.copyItems(ctx, *oldModel.Metadata(), newMetaSchema.ID(), newModel.ID()); err != nil { + return nil, err + } + } + // return the new model + return newModel, nil + }) +} + +func (i Model) copyModel(ctx context.Context, params interfaces.CopyModelParam, operator *usecase.Operator) (*model.Model, *model.Model, error) { + oldModel, err := i.repos.Model.FindByID(ctx, params.ModelId) + if err != nil { + return nil, nil, err + } + name := lo.ToPtr(oldModel.Name() + " Copy") + if params.Name != nil { + name = params.Name + } + key := id.RandomKey().Ref().StringRef() + if params.Key != nil { + key = params.Key + } + newModel, err := i.Create(ctx, interfaces.CreateModelParam{ + ProjectId: oldModel.Project(), + Name: name, + Description: lo.ToPtr(oldModel.Description()), + Key: key, + Public: lo.ToPtr(oldModel.Public()), + }, operator) + if err != nil { + return nil, nil, err + } + return oldModel, newModel, nil +} + +func (i Model) copySchema(ctx context.Context, oldSchemaId, newSchemaId id.SchemaID) error { + oldSchema, err := i.repos.Schema.FindByID(ctx, oldSchemaId) + if err != nil { + return err + } + newSchema, err := i.repos.Schema.FindByID(ctx, newSchemaId) + if err != nil { + return err + } + newSchema.CopyFrom(oldSchema) + return i.repos.Schema.Save(ctx, newSchema) +} + +func (i Model) copyMetaSchema(ctx context.Context, oldMetaSchemaId id.SchemaID, newModel *model.Model) (*schema.Schema, error) { + oldMetaSchema, err := i.repos.Schema.FindByID(ctx, oldMetaSchemaId) + if err != nil { + return nil, err + } + newMetaSchema, err := schema.New(). + NewID(). + Workspace(oldMetaSchema.Workspace()). + Project(oldMetaSchema.Project()). + TitleField(nil). + Build() + if err != nil { + return nil, err + } + newMetaSchema.CopyFrom(oldMetaSchema) + newModel.SetMetadata(newMetaSchema.ID()) + if err := i.repos.Model.Save(ctx, newModel); err != nil { + return nil, err + } + if err := i.repos.Schema.Save(ctx, newMetaSchema); err != nil { + return nil, err + } + return newMetaSchema, nil +} + +func (i Model) copyItems(ctx context.Context, oldSchemaID, newSchemaID id.SchemaID, newModelID id.ModelID) error { + collection := "item" + filter, err := json.Marshal(bson.M{"schema": oldSchemaID.String()}) + if err != nil { + return err + } + changes, err := json.Marshal(task.Changes{ + "id": { + Type: task.ChangeTypeNew, + Value: "item", + }, + "schema": { + Type: task.ChangeTypeSet, + Value: newSchemaID.String(), + }, + "modelid": { + Type: task.ChangeTypeSet, + Value: newModelID.String(), + }, + }) + if err != nil { + return err + } + return i.triggerCopyEvent(ctx, collection, string(filter), string(changes)) +} + +func (i Model) triggerCopyEvent(ctx context.Context, collection, filter, changes string) error { + if i.gateways.TaskRunner == nil { + log.Infof("model: copy of %s skipped because task runner is not configured", collection) + return nil + } + + taskPayload := task.CopyPayload{ + Collection: collection, + Filter: filter, + Changes: changes, + } + + if err := i.gateways.TaskRunner.Run(ctx, taskPayload.Payload()); err != nil { + return fmt.Errorf("failed to trigger copy event: %w", err) + } + + log.Infof("model: successfully triggered copy event for collection %s, filter: %s, changes: %s", collection, filter, changes) + return nil +} diff --git a/server/internal/usecase/interactor/model_test.go b/server/internal/usecase/interactor/model_test.go index 9c412572f8..171ee4f635 100644 --- a/server/internal/usecase/interactor/model_test.go +++ b/server/internal/usecase/interactor/model_test.go @@ -2,16 +2,22 @@ package interactor import ( "context" + "errors" "testing" "time" + "github.com/golang/mock/gomock" "github.com/reearth/reearth-cms/server/internal/infrastructure/memory" "github.com/reearth/reearth-cms/server/internal/usecase" + "github.com/reearth/reearth-cms/server/internal/usecase/gateway" + "github.com/reearth/reearth-cms/server/internal/usecase/gateway/gatewaymock" "github.com/reearth/reearth-cms/server/internal/usecase/interfaces" "github.com/reearth/reearth-cms/server/internal/usecase/repo" "github.com/reearth/reearth-cms/server/pkg/id" "github.com/reearth/reearth-cms/server/pkg/model" "github.com/reearth/reearth-cms/server/pkg/project" + "github.com/reearth/reearth-cms/server/pkg/schema" + "github.com/reearth/reearthx/account/accountdomain" "github.com/reearth/reearthx/account/accountdomain/user" "github.com/reearth/reearthx/account/accountusecase" "github.com/reearth/reearthx/rerror" @@ -530,3 +536,109 @@ func TestNewModel(t *testing.T) { }) } } + +func TestModel_Copy(t *testing.T) { + mockTime := time.Now() + wid := accountdomain.NewWorkspaceID() + p := project.New().NewID().Workspace(wid).MustBuild() + op := &usecase.Operator{OwningProjects: []id.ProjectID{p.ID()}} + + fId1 := id.NewFieldID() + sfKey1 := id.RandomKey() + sf1 := schema.NewField(schema.NewBool().TypeProperty()).ID(fId1).Key(sfKey1).MustBuild() + s1 := schema.New().NewID().Workspace(wid).Project(p.ID()).Fields([]*schema.Field{sf1}).MustBuild() + fId2 := id.NewFieldID() + sfKey2 := id.RandomKey() + sf2 := schema.NewField(schema.NewBool().TypeProperty()).ID(fId2).Key(sfKey2).MustBuild() + s2 := schema.New().NewID().Workspace(wid).Project(p.ID()).Fields([]*schema.Field{sf2}).MustBuild() + m := model.New().NewID().Key(id.RandomKey()).Project(p.ID()).Schema(s1.ID()).Metadata(s2.ID().Ref()).MustBuild() + + ctx := context.Background() + db := memory.New() + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mRunner := gatewaymock.NewMockTaskRunner(mockCtrl) + gw := &gateway.Container{TaskRunner: mRunner} + u := NewModel(db, gw) + + defer memory.MockNow(db, mockTime)() + + err := db.Project.Save(ctx, p.Clone()) + assert.NoError(t, err) + err = db.Model.Save(ctx, m.Clone()) + assert.NoError(t, err) + err = db.Schema.Save(ctx, s1.Clone()) + assert.NoError(t, err) + err = db.Schema.Save(ctx, s2.Clone()) + assert.NoError(t, err) + + tests := []struct { + name string + param interfaces.CopyModelParam + setupMock func() + wantErr bool + validate func(t *testing.T, got *model.Model) + }{ + { + name: "successful copy", + param: interfaces.CopyModelParam{ + ModelId: m.ID(), + Name: lo.ToPtr("Copied Model"), + }, + setupMock: func() { + mRunner.EXPECT().Run(ctx, gomock.Any()).Times(1).Return(nil) + }, + wantErr: false, + validate: func(t *testing.T, got *model.Model) { + assert.NotEqual(t, m.ID(), got.ID()) + assert.NotEqual(t, m.Key(), got.Key()) + assert.Equal(t, "Copied Model", got.Name()) + assert.Equal(t, m.Description(), got.Description()) + assert.Equal(t, m.Public(), got.Public()) + }, + }, + { + name: "missing model ID", + param: interfaces.CopyModelParam{ + ModelId: id.ModelID{}, + Name: lo.ToPtr("Copied Model"), + }, + setupMock: func() { + mRunner.EXPECT().Run(ctx, gomock.Any()).Times(0) + }, + wantErr: true, + validate: func(t *testing.T, got *model.Model) { + assert.Nil(t, got) + }, + }, + { + name: "task runner error", + param: interfaces.CopyModelParam{ + ModelId: m.ID(), + Name: lo.ToPtr("Copied Model"), + }, + setupMock: func() { + mRunner.EXPECT().Run(ctx, gomock.Any()).Times(1).Return(errors.New("task runner error")) + }, + wantErr: true, + validate: func(t *testing.T, got *model.Model) { + assert.Nil(t, got) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.setupMock() + got, err := u.Copy(ctx, tt.param, op) + if tt.wantErr { + assert.Error(t, err) + tt.validate(t, nil) + } else { + assert.NoError(t, err) + tt.validate(t, got) + } + }) + } +} diff --git a/server/internal/usecase/interfaces/model.go b/server/internal/usecase/interfaces/model.go index 23f00d1dc2..fd7e32fb4e 100644 --- a/server/internal/usecase/interfaces/model.go +++ b/server/internal/usecase/interfaces/model.go @@ -20,6 +20,12 @@ type CreateModelParam struct { Public *bool } +type CopyModelParam struct { + ModelId id.ModelID + Name *string + Key *string +} + type FindOrCreateSchemaParam struct { ModelID *id.ModelID GroupID *id.GroupID @@ -56,4 +62,5 @@ type Model interface { CheckKey(context.Context, id.ProjectID, string) (bool, error) Delete(context.Context, id.ModelID, *usecase.Operator) error Publish(context.Context, id.ModelID, bool, *usecase.Operator) (bool, error) + Copy(context.Context, CopyModelParam, *usecase.Operator) (*model.Model, error) } diff --git a/server/pkg/integrationapi/types.gen.go b/server/pkg/integrationapi/types.gen.go index ac4daf94f9..76a6a0ea28 100644 --- a/server/pkg/integrationapi/types.gen.go +++ b/server/pkg/integrationapi/types.gen.go @@ -535,11 +535,11 @@ type RefOrVersionRef string // Schema defines model for schema. type Schema struct { - TitleField *id.FieldID `json:"TitleField,omitempty"` CreatedAt *time.Time `json:"createdAt,omitempty"` Fields *[]SchemaField `json:"fields,omitempty"` Id *id.SchemaID `json:"id,omitempty"` ProjectId *id.ProjectID `json:"projectId,omitempty"` + TitleField *id.FieldID `json:"titleField,omitempty"` } // SchemaField defines model for schemaField. @@ -705,6 +705,12 @@ type ModelUpdateJSONBody struct { Name *string `json:"name,omitempty"` } +// CopyModelJSONBody defines parameters for CopyModel. +type CopyModelJSONBody struct { + Key *string `json:"key,omitempty"` + Name *string `json:"name,omitempty"` +} + // ModelImportJSONBody defines parameters for ModelImport. type ModelImportJSONBody struct { AssetId id.AssetID `json:"assetId"` @@ -1022,6 +1028,9 @@ type ItemCommentUpdateJSONRequestBody ItemCommentUpdateJSONBody // ModelUpdateJSONRequestBody defines body for ModelUpdate for application/json ContentType. type ModelUpdateJSONRequestBody ModelUpdateJSONBody +// CopyModelJSONRequestBody defines body for CopyModel for application/json ContentType. +type CopyModelJSONRequestBody CopyModelJSONBody + // ModelImportJSONRequestBody defines body for ModelImport for application/json ContentType. type ModelImportJSONRequestBody ModelImportJSONBody diff --git a/server/pkg/schema/schema.go b/server/pkg/schema/schema.go index caf8915348..fa16625bb1 100644 --- a/server/pkg/schema/schema.go +++ b/server/pkg/schema/schema.go @@ -172,3 +172,11 @@ func (s *Schema) IsPointFieldSupported() bool { } return false } + +func (s *Schema) CopyFrom(s2 *Schema) { + if s == nil || s2 == nil { + return + } + s.fields = slices.Clone(s2.fields) + s.titleField = s2.TitleField().CloneRef() +} diff --git a/server/pkg/schema/schema_test.go b/server/pkg/schema/schema_test.go index a3c6275155..7bb6cd1db4 100644 --- a/server/pkg/schema/schema_test.go +++ b/server/pkg/schema/schema_test.go @@ -479,3 +479,17 @@ func TestSchema_HasFieldByKey(t *testing.T) { }) } } + +func TestSchema_CopyFrom(t *testing.T) { + fid := id.NewFieldID() + s1 := &Schema{id: id.NewSchemaID(), fields: []*Field{{id: id.NewFieldID(), name: "f1", key: id.RandomKey()}}, titleField: fid.Ref()} + s2 := &Schema{id: id.NewSchemaID(), fields: []*Field{}} + s2.CopyFrom(s1) + assert.Equal(t, s1.fields, s2.fields) + assert.Equal(t, s1.titleField, s2.titleField) + + s3 := &Schema{id: id.NewSchemaID(), fields: []*Field{}} + s3.CopyFrom(nil) + assert.Equal(t, 0, len(s3.fields)) + assert.Nil(t, s3.titleField) +} diff --git a/server/pkg/task/task.go b/server/pkg/task/task.go index 24d1caa2f4..3dab9097c3 100644 --- a/server/pkg/task/task.go +++ b/server/pkg/task/task.go @@ -9,6 +9,7 @@ type Payload struct { DecompressAsset *DecompressAssetPayload CompressAsset *CompressAssetPayload Webhook *WebhookPayload + Copy *CopyPayload } type DecompressAssetPayload struct { @@ -43,3 +44,31 @@ func (t WebhookPayload) Payload() Payload { Webhook: &t, } } + +type CopyPayload struct { + Collection string + Filter string + Changes string +} + +func (p *CopyPayload) Payload() Payload { + return Payload{ + Copy: p, + } +} + +func (p *CopyPayload) Validate() bool { + return p != nil && p.Changes != "" && p.Collection != "" && p.Filter != "" +} + +type Changes map[string]Change +type Change struct { + Type ChangeType + Value string +} +type ChangeType string + +const ( + ChangeTypeSet ChangeType = "set" + ChangeTypeNew ChangeType = "new" +) diff --git a/server/schemas/integration.yml b/server/schemas/integration.yml index 5f62f94e7a..924af2c8c2 100644 --- a/server/schemas/integration.yml +++ b/server/schemas/integration.yml @@ -449,6 +449,41 @@ paths: $ref: '#/components/responses/UnauthorizedError' '500': description: Internal server error + '/models/{modelId}/copy': + parameters: + - $ref: '#/components/parameters/modelIdParam' + post: + operationId: CopyModel + summary: Copy schema and items of a selected model + tags: + - Models + security: + - bearerAuth: [ ] + requestBody: + content: + application/json: + schema: + type: object + properties: + name: + type: string + key: + type: string + responses: + '200': + description: A JSON object of field + content: + application/json: + schema: + $ref: '#/components/schemas/model' + '400': + description: Invalid request parameter value + '401': + $ref: '#/components/responses/UnauthorizedError' + '404': + $ref: '#/components/responses/NotFoundError' + '500': + description: Internal server error '/schemata/{schemaId}/fields': parameters: - $ref: '#/components/parameters/schemaIdParam' @@ -1823,7 +1858,7 @@ components: type: array items: $ref: '#/components/schemas/schemaField' - TitleField: + titleField: x-go-type: id.FieldID type: string createdAt: diff --git a/worker/Dockerfile b/worker/Dockerfile index f3c36bed7c..d9f528b118 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -15,7 +15,6 @@ WORKDIR /app RUN go mod download COPY server/pkg/ /app/server/pkg/ -COPY server/internal/infrastructure/ /app/server/internal/infrastructure/ COPY worker/cmd/ /app/worker/cmd/ COPY worker/internal/ /app/worker/internal/ COPY worker/pkg/ /app/worker/pkg/ diff --git a/worker/cmd/copier/main.go b/worker/cmd/copier/main.go new file mode 100644 index 0000000000..10b5efdef1 --- /dev/null +++ b/worker/cmd/copier/main.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "os" + "time" + + "github.com/joho/godotenv" + "github.com/reearth/reearth-cms/worker/internal/adapter/http" + rmongo "github.com/reearth/reearth-cms/worker/internal/infrastructure/mongo" + "github.com/reearth/reearth-cms/worker/internal/usecase/interactor" + "github.com/reearth/reearth-cms/worker/internal/usecase/repo" + "github.com/reearth/reearthx/log" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo" +) + +func main() { + log.Infof("reearth-cms/worker: copier has started") + ctx := context.Background() + + if err := godotenv.Load(".env"); err != nil && !os.IsNotExist(err) { + log.Fatal("config: unable to load .env") + } else if err == nil { + log.Infof("config: .env loaded") + } + + dbURI := mustGetEnv("REEARTH_CMS_WORKER_DB") + collection := mustGetEnv("REEARTH_CMS_COPIER_COLLECTION") + filter := mustGetEnv("REEARTH_CMS_COPIER_FILTER") + changes := mustGetEnv("REEARTH_CMS_COPIER_CHANGES") + + repos, err := initReposWithCollection(ctx, dbURI, collection) + if err != nil { + log.Fatalf("failed to initialize repositories with DB URI %s: %v", dbURI, err) + } + + uc := interactor.NewUsecase(nil, repos) + ctrl := http.NewCopyController(uc) + if err := ctrl.Copy(ctx, http.CopyInput{Filter: filter, Changes: changes}); err != nil { + log.Fatalf("copy operation failed: %v", err) + } +} + +func mustGetEnv(key string) string { + value := os.Getenv(key) + if value == "" { + log.Fatalf("environment variable %s is required", key) + } + return value +} + +func initReposWithCollection(ctx context.Context, dbURI, collection string) (*repo.Container, error) { + client, err := mongo.Connect( + ctx, + options.Client(). + ApplyURI(dbURI). + SetConnectTimeout(10*time.Second). + SetMonitor(otelmongo.NewMonitor()), + ) + if err != nil { + return nil, err + } + + db := client.Database("reearth_cms") + mongoCopier := rmongo.NewCopier(db) + mongoCopier.SetCollection(db.Collection(collection)) + return rmongo.New(ctx, nil, mongoCopier) +} diff --git a/worker/copier.Dockerfile b/worker/copier.Dockerfile new file mode 100644 index 0000000000..1072477d3a --- /dev/null +++ b/worker/copier.Dockerfile @@ -0,0 +1,8 @@ +FROM golang:1.23.3 AS build +WORKDIR /app +COPY . . +RUN CGO_ENABLED=0 go build ./cmd/copier + +FROM scratch +COPY --from=build /app/copier /copier +ENTRYPOINT ["/copier"] \ No newline at end of file diff --git a/worker/internal/adapter/http/copy.go b/worker/internal/adapter/http/copy.go new file mode 100644 index 0000000000..90541f3af5 --- /dev/null +++ b/worker/internal/adapter/http/copy.go @@ -0,0 +1,38 @@ +package http + +import ( + "context" + "encoding/json" + + "github.com/reearth/reearth-cms/server/pkg/task" + "github.com/reearth/reearth-cms/worker/internal/usecase/interactor" + "github.com/reearth/reearthx/rerror" + "go.mongodb.org/mongo-driver/bson" +) + +type CopyController struct { + usecase *interactor.Usecase +} + +func NewCopyController(u *interactor.Usecase) *CopyController { + return &CopyController{ + usecase: u, + } +} + +type CopyInput struct { + Filter string `json:"filter"` + Changes string `json:"changes"` +} + +func (c *CopyController) Copy(ctx context.Context, input CopyInput) error { + var filter bson.M + if err := json.Unmarshal([]byte(input.Filter), &filter); err != nil { + return rerror.ErrInternalBy(err) + } + var changes task.Changes + if err := json.Unmarshal([]byte(input.Changes), &changes); err != nil { + return rerror.ErrInternalBy(err) + } + return c.usecase.Copy(ctx, filter, changes) +} diff --git a/worker/internal/adapter/http/main.go b/worker/internal/adapter/http/main.go index 01434495c8..3e028f8b12 100644 --- a/worker/internal/adapter/http/main.go +++ b/worker/internal/adapter/http/main.go @@ -5,10 +5,13 @@ import "github.com/reearth/reearth-cms/worker/internal/usecase/interactor" type Controller struct { DecompressController *DecompressController WebhookController *WebhookController + CopyController *CopyController } func NewController(uc *interactor.Usecase) *Controller { - return &Controller{DecompressController: NewDecompressController(uc), - WebhookController: NewWebhookController(uc), + return &Controller{ + DecompressController: NewDecompressController(uc), + WebhookController: NewWebhookController(uc), + CopyController: NewCopyController(uc), } } diff --git a/worker/internal/app/main.go b/worker/internal/app/main.go index 6e58d3021c..8dc173c381 100644 --- a/worker/internal/app/main.go +++ b/worker/internal/app/main.go @@ -15,6 +15,7 @@ import ( rmongo "github.com/reearth/reearth-cms/worker/internal/infrastructure/mongo" "github.com/reearth/reearth-cms/worker/internal/usecase/gateway" "github.com/reearth/reearth-cms/worker/internal/usecase/interactor" + "github.com/reearth/reearth-cms/worker/internal/usecase/repo" "github.com/reearth/reearthx/log" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -44,12 +45,16 @@ func Start(debug bool, version string) { } mongoWebhook := rmongo.NewWebhook(client.Database("reearth_cms")) lo.Must0(mongoWebhook.InitIndex(ctx)) + repos, err := rmongo.New(ctx, mongoWebhook, nil) + if err != nil { + log.Fatalf("repo initialization error: %+v\n", err) + } // gateways gateways := initReposAndGateways(ctx, conf, debug) // usecase - uc := interactor.NewUsecase(gateways, mongoWebhook) + uc := interactor.NewUsecase(gateways, repos) ctrl := rhttp.NewController(uc) handler := NewHandler(ctrl) @@ -58,6 +63,7 @@ func Start(debug bool, version string) { Config: conf, Debug: debug, Gateways: gateways, + Repos: repos, }, handler).Run(ctx) } @@ -70,6 +76,7 @@ type ServerConfig struct { Config *Config Debug bool Gateways *gateway.Container + Repos *repo.Container } func NewServer(ctx context.Context, cfg *ServerConfig, handler *Handler) *WebServer { diff --git a/worker/internal/infrastructure/mongo/common_test.go b/worker/internal/infrastructure/mongo/common_test.go new file mode 100644 index 0000000000..766e966fbc --- /dev/null +++ b/worker/internal/infrastructure/mongo/common_test.go @@ -0,0 +1,7 @@ +package mongo + +import "github.com/reearth/reearthx/mongox/mongotest" + +func init() { + mongotest.Env = "REEARTH_CMS_WORKER_DB" +} diff --git a/worker/internal/infrastructure/mongo/container.go b/worker/internal/infrastructure/mongo/container.go new file mode 100644 index 0000000000..5b57c2b015 --- /dev/null +++ b/worker/internal/infrastructure/mongo/container.go @@ -0,0 +1,34 @@ +package mongo + +import ( + "context" + "errors" + + "github.com/reearth/reearth-cms/worker/internal/usecase/repo" +) + +func New(ctx context.Context, webhook *Webhook, copier *Copier) (*repo.Container, error) { + r := &repo.Container{ + Webhook: webhook, + Copier: copier, + } + + // init + if err := Init(r); err != nil { + return nil, err + } + + return r, nil +} + +func Init(r *repo.Container) error { + if r == nil { + return nil + } + + if r.Webhook == nil && r.Copier == nil { + return errors.New("invalid repository container") + } + + return nil +} diff --git a/worker/internal/infrastructure/mongo/copier.go b/worker/internal/infrastructure/mongo/copier.go new file mode 100644 index 0000000000..a7ff841434 --- /dev/null +++ b/worker/internal/infrastructure/mongo/copier.go @@ -0,0 +1,97 @@ +package mongo + +import ( + "context" + "errors" + + "github.com/reearth/reearth-cms/server/pkg/id" + "github.com/reearth/reearth-cms/server/pkg/task" + "github.com/reearth/reearth-cms/worker/internal/usecase/repo" + "github.com/reearth/reearthx/log" + "github.com/reearth/reearthx/rerror" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// TODO: this should be taken from config +var batchSize int32 = 1000 + +var _ repo.Copier = (*Copier)(nil) + +type Copier struct { + c *mongo.Collection +} + +func NewCopier(_ *mongo.Database) *Copier { + return &Copier{} +} + +func (r *Copier) SetCollection(collection *mongo.Collection) { + r.c = collection +} + +func (r *Copier) Copy(ctx context.Context, f bson.M, changesMap task.Changes) error { + options := options.Find().SetBatchSize(batchSize) + cursor, err := r.c.Find(ctx, f, options) + if err != nil { + if errors.Is(err, mongo.ErrNilDocument) || errors.Is(err, mongo.ErrNoDocuments) { + return rerror.ErrNotFound + } + return rerror.ErrInternalBy(err) + } + defer cursor.Close(ctx) + + var bulkModels []mongo.WriteModel + for cursor.Next(ctx) { + if err := cursor.Err(); err != nil { + return rerror.ErrInternalBy(err) + } + + var result bson.M + if err := cursor.Decode(&result); err != nil { + return rerror.ErrInternalBy(err) + } + result["_id"] = primitive.NewObjectID() + + for k, change := range changesMap { + switch change.Type { + case task.ChangeTypeNew: + newId, _ := generateId(change.Value) + result[k] = newId + case task.ChangeTypeSet: + result[k] = change.Value + } + } + + bulkModels = append(bulkModels, mongo.NewInsertOneModel().SetDocument(result)) + } + + if err := cursor.Close(ctx); err != nil { + return rerror.ErrInternalBy(err) + } + + if len(bulkModels) > 0 { + _, err = r.c.BulkWrite(ctx, bulkModels) + if err != nil { + return rerror.ErrInternalBy(err) + } + } + log.Infof("reearth-cms/worker: all data has been successfully copied!") + + return nil +} + +func generateId(t string) (string, bool) { + switch t { + case "item": + return id.NewAssetID().String(), true + case "schema": + return id.NewSchemaID().String(), true + case "model": + return id.NewModelID().String(), true + default: + return "", false + } +} diff --git a/worker/internal/infrastructure/mongo/copier_test.go b/worker/internal/infrastructure/mongo/copier_test.go new file mode 100644 index 0000000000..cb87891076 --- /dev/null +++ b/worker/internal/infrastructure/mongo/copier_test.go @@ -0,0 +1,98 @@ +package mongo + +import ( + "context" + "testing" + + "github.com/reearth/reearth-cms/server/pkg/id" + "github.com/reearth/reearth-cms/server/pkg/item" + "github.com/reearth/reearth-cms/server/pkg/task" + "github.com/reearth/reearthx/mongox/mongotest" + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" +) + +func TestCopier_SetCollection(t *testing.T) { + db := mongotest.Connect(t)(t) + w := NewCopier(db) + w.SetCollection(db.Collection("item")) + assert.Equal(t, "item", w.c.Name()) +} + +func TestCopier_Copy(t *testing.T) { + ctx := context.Background() + db := mongotest.Connect(t)(t) + w := NewCopier(db) + iCol := db.Collection("item") + w.SetCollection(iCol) + + mid1 := id.NewModelID() + sid1 := id.NewSchemaID() + mid2 := id.NewModelID() + sid2 := id.NewSchemaID() + i1 := item.New().ID(id.NewItemID()).Schema(sid1).Model(mid1).Project(id.NewProjectID()).Thread(id.NewThreadID()).MustBuild() + i2 := item.New().ID(id.NewItemID()).Schema(sid1).Model(mid1).Project(id.NewProjectID()).Thread(id.NewThreadID()).MustBuild() + + _, err := iCol.InsertMany(ctx, []any{i1, i2}) + assert.NoError(t, err) + + filter := bson.M{"schema": sid1.String()} + changes := task.Changes{ + "id": { + Type: task.ChangeTypeNew, + Value: "item", + }, + "schema": { + Type: task.ChangeTypeSet, + Value: sid2.String(), + }, + "model": { + Type: task.ChangeTypeSet, + Value: mid2.String(), + }, + } + + err = w.Copy(ctx, filter, changes) + assert.NoError(t, err) +} + +func TestCopier_GenerateId(t *testing.T) { + tests := []struct { + name string + input string + expectOk bool + }{ + { + name: "Valid input - item", + input: "item", + expectOk: true, + }, + { + name: "Valid input - schema", + input: "schema", + expectOk: true, + }, + { + name: "Valid input - model", + input: "model", + expectOk: true, + }, + { + name: "Invalid input - unknown type", + input: "unknown", + expectOk: false, + }, + { + name: "Empty input", + input: "", + expectOk: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, ok := generateId(tt.input) + assert.Equal(t, tt.expectOk, ok) + }) + } +} diff --git a/worker/internal/infrastructure/mongo/webhook_test.go b/worker/internal/infrastructure/mongo/webhook_test.go index 394e348e47..9415bf318a 100644 --- a/worker/internal/infrastructure/mongo/webhook_test.go +++ b/worker/internal/infrastructure/mongo/webhook_test.go @@ -8,10 +8,6 @@ import ( "github.com/stretchr/testify/assert" ) -func init() { - mongotest.Env = "REEARTH_CMS_WORKER_DB" -} - func TestWebhook(t *testing.T) { ctx := context.Background() db := mongotest.Connect(t)(t) diff --git a/worker/internal/usecase/interactor/copier.go b/worker/internal/usecase/interactor/copier.go new file mode 100644 index 0000000000..abf56d6ae1 --- /dev/null +++ b/worker/internal/usecase/interactor/copier.go @@ -0,0 +1,12 @@ +package interactor + +import ( + "context" + + "github.com/reearth/reearth-cms/server/pkg/task" + "go.mongodb.org/mongo-driver/bson" +) + +func (u *Usecase) Copy(ctx context.Context, filter bson.M, changes task.Changes) error { + return u.repos.Copier.Copy(ctx, filter, changes) +} diff --git a/worker/internal/usecase/interactor/usecase.go b/worker/internal/usecase/interactor/usecase.go index 81734caa70..7b56b65afe 100644 --- a/worker/internal/usecase/interactor/usecase.go +++ b/worker/internal/usecase/interactor/usecase.go @@ -7,9 +7,12 @@ import ( type Usecase struct { gateways *gateway.Container - webhook repo.Webhook + repos *repo.Container } -func NewUsecase(g *gateway.Container, webhook repo.Webhook) *Usecase { - return &Usecase{gateways: g, webhook: webhook} +func NewUsecase(g *gateway.Container, r *repo.Container) *Usecase { + return &Usecase{ + gateways: g, + repos: r, + } } diff --git a/worker/internal/usecase/interactor/webhook.go b/worker/internal/usecase/interactor/webhook.go index 3f0a2530b3..1a54af007f 100644 --- a/worker/internal/usecase/interactor/webhook.go +++ b/worker/internal/usecase/interactor/webhook.go @@ -10,7 +10,7 @@ import ( func (u *Usecase) SendWebhook(ctx context.Context, w *webhook.Webhook) error { eid := fmt.Sprintf("%s_%s", w.EventID, w.WebhookID) - found, err := u.webhook.GetAndSet(ctx, eid) + found, err := u.repos.Webhook.GetAndSet(ctx, eid) if err != nil { log.Errorf("webhook usecase: failed to get webhook sent: %v", err) } @@ -22,7 +22,7 @@ func (u *Usecase) SendWebhook(ctx context.Context, w *webhook.Webhook) error { if err := webhook.Send(ctx, w); err != nil { log.Errorf("webhook usecase: error response: %v", err) - if err2 := u.webhook.Delete(ctx, eid); err2 != nil { + if err2 := u.repos.Webhook.Delete(ctx, eid); err2 != nil { log.Errorf("webhook usecase: failed to set webhook sent: %v", err2) } return err diff --git a/worker/internal/usecase/repo/container.go b/worker/internal/usecase/repo/container.go new file mode 100644 index 0000000000..65b795bd29 --- /dev/null +++ b/worker/internal/usecase/repo/container.go @@ -0,0 +1,6 @@ +package repo + +type Container struct { + Webhook Webhook + Copier Copier +} diff --git a/worker/internal/usecase/repo/copier.go b/worker/internal/usecase/repo/copier.go new file mode 100644 index 0000000000..086e7ccf25 --- /dev/null +++ b/worker/internal/usecase/repo/copier.go @@ -0,0 +1,12 @@ +package repo + +import ( + "context" + + "github.com/reearth/reearth-cms/server/pkg/task" + "go.mongodb.org/mongo-driver/bson" +) + +type Copier interface { + Copy(context.Context, bson.M, task.Changes) error +}