Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REBASE #135] Add batched migration script #168

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scripts/MySql/1_Migration_Setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ CREATE TABLE IF NOT EXISTS tags(
PRIMARY KEY (ordering_id, tag, persistence_id)
);

DROP PROCEDURE IF EXISTS Split;
DROP PROCEDURE IF EXISTS AkkaMigration_Split;

DELIMITER ??
CREATE PROCEDURE Split()
CREATE PROCEDURE AkkaMigration_Split()
BEGIN

DECLARE v_cursor_done TINYINT UNSIGNED DEFAULT 0;
Expand Down
2 changes: 1 addition & 1 deletion scripts/MySql/2_Migration.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CALL Split();
CALL AkkaMigration_Split();
2 changes: 1 addition & 1 deletion scripts/MySql/3_Post_Migration_Cleanup.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP PROCEDURE IF EXISTS Split;
DROP PROCEDURE IF EXISTS AkkaMigration_Split;
91 changes: 91 additions & 0 deletions scripts/MySql/Batched/1_Migration_Setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
CREATE TABLE IF NOT EXISTS tags(
ordering_id BIGINT NOT NULL,
tag NVARCHAR(64) NOT NULL,
sequence_nr BIGINT NOT NULL,
persistence_id VARCHAR(255),
PRIMARY KEY (ordering_id, tag, persistence_id)
);

DROP PROCEDURE IF EXISTS AkkaMigration_Split;

DELIMITER ??
CREATE PROCEDURE AkkaMigration_Split(IN fromId INT, IN toId INT)
BEGIN

DECLARE v_cursor_done TINYINT UNSIGNED DEFAULT 0;
DECLARE Id INT UNSIGNED;
DECLARE String VARCHAR(8000);
DECLARE PId VARCHAR(255);
DECLARE SeqNr INT UNSIGNED;
DECLARE idx INT UNSIGNED;
DECLARE slice VARCHAR(8000);

DECLARE v_cursor CURSOR FOR
SELECT ej.`ordering`, ej.tags
FROM event_journal ej
WHERE ej.`ordering` >= fromId AND ej.`ordering` <= toId
ORDER BY ej.`ordering`;
DECLARE CONTINUE HANDLER FOR NOT FOUND
SET v_cursor_done = 1;

OPEN v_cursor;
REPEAT
FETCH v_cursor INTO Id, String, SeqNr, PId;
SET idx = 1;

IF String IS NULL OR LENGTH(String) < 1 THEN
SET idx = 0;
END IF;

WHILE idx != 0 DO
SET idx = LOCATE(';', String);
IF idx != 0 THEN
SET slice = LEFT(String, idx - 1);
ELSE
SET slice = String;
END IF;

IF LENGTH(slice) > 0 THEN
INSERT IGNORE INTO tags (ordering_id, tag, sequence_nr, persistence_id) VALUES (Id, slice, SeqNr, PId);
END IF;

SET String = RIGHT(String, LENGTH(String) - idx);

IF LENGTH(String) = 0 THEN
SET idx = 0;
END IF;
END WHILE;
UNTIL v_cursor_done END REPEAT;

CLOSE v_cursor;

END??

CREATE PROCEDURE AkkaMigration_BatchedMigration(IN fromId BIGINT)
BEGIN
DECLARE maxId BIGINT UNSIGNED;
DECLARE oldCommitValue TINYINT DEFAULT @@autocommit;

SELECT maxId = MAX(ej.`ordering`)
FROM event_journal ej
WHERE
ej.`tags` IS NOT null
AND LENGTH(ej.`tags`) > 0
AND ej.`ordering` NOT IN (SELECT t.`ordering_id` FROM tags t);

loopLabel: LOOP
IF fromId > maxId THEN
LEAVE loopLabel;
END IF;

SET autocommit = 0;
START TRANSACTION;
CALL AkkaMigration_Split(fromId, fromId + 1000);
COMMIT;
SET autocommit = oldCommitValue;

SET fromId = fromId + 1000;
END LOOP;
END ??

DELIMITER ;
1 change: 1 addition & 0 deletions scripts/MySql/Batched/2_Migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CALL AkkaMigration_BatchedMigration(0);
2 changes: 2 additions & 0 deletions scripts/MySql/Batched/3_Post_Migration_Cleanup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP PROCEDURE IF EXISTS AkkaMigration_Split;
DROP PROCEDURE IF EXISTS AkkaMigration_BatchedMigration;
3 changes: 3 additions & 0 deletions scripts/MySql/Batched/DROP_migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE IF EXISTS TagTable;
DROP PROCEDURE IF EXISTS AkkaMigration_Split;
DROP PROCEDURE IF EXISTS AkkaMigration_BatchedMigration;
2 changes: 1 addition & 1 deletion scripts/MySql/DROP_migration.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DROP TABLE IF EXISTS TagTable;
DROP PROCEDURE IF EXISTS Split;
DROP PROCEDURE IF EXISTS AkkaMigration_Split;
10 changes: 5 additions & 5 deletions scripts/PostgreSql/1_Migration_Setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS "public"."tags"(
PRIMARY KEY (ordering_id, tag, persistence_id)
);

CREATE OR REPLACE PROCEDURE "public"."Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$
CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$
DECLARE var_t record;
BEGIN
FOR var_t IN(SELECT unnest(string_to_array(tags, ';')) AS t)
Expand All @@ -19,12 +19,12 @@ BEGIN
END
$$ LANGUAGE plpgsql;

CREATE OR REPLACE PROCEDURE "public"."Normalize"() AS $$
CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Normalize"() AS $$
DECLARE var_r record;
BEGIN
FOR var_r IN(SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid FROM "public"."event_journal" AS ej ORDER BY "ordering")
LOOP
CALL "public"."Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid);
FOR var_r IN(SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid FROM "public"."event_journal" AS ej ORDER BY "ordering")
LOOP
CALL "public"."AkkaMigration_Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid);
END LOOP;
END
$$ LANGUAGE plpgsql;
2 changes: 1 addition & 1 deletion scripts/PostgreSql/2_Migration.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
CALL "public"."Normalize"();
CALL "public"."AkkaMigration_Normalize"();
4 changes: 2 additions & 2 deletions scripts/PostgreSql/3_Post_Migration_Cleanup.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DROP PROCEDURE IF EXISTS "public"."Split";
DROP PROCEDURE IF EXISTS "public"."Normalize";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize";
55 changes: 55 additions & 0 deletions scripts/PostgreSql/Batched/1_Migration_Setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
CREATE TABLE IF NOT EXISTS "public"."tags"(
ordering_id BIGINT NOT NULL,
tag VARCHAR(64) NOT NULL,
sequence_nr BIGINT NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
PRIMARY KEY (ordering_id, tag)
);

CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Split"(id bigint, tags varchar(8000), seq_nr bigint, pid varchar(255)) AS $$
DECLARE var_t record;
BEGIN
FOR var_t IN(SELECT unnest(string_to_array(tags, ';')) AS t)
LOOP
CONTINUE WHEN var_t.t IS NULL OR var_t.t = '';
INSERT INTO "public"."tags" (ordering_id, tag, sequence_nr, persistence_id)
VALUES (id, var_t.t, seq_nr, pid)
ON CONFLICT DO NOTHING;
END LOOP;
END
$$ LANGUAGE plpgsql;

CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_Normalize"(IN fromId BIGINT, IN toId BIGINT) AS $$
DECLARE var_r record;
BEGIN
FOR var_r IN(
SELECT ej."ordering" AS id, ej."tags", ej."sequence_nr" as seq_nr, ej."persistence_id" AS pid
FROM "public"."event_journal" AS ej
WHERE ej.ordering >= fromId AND ej.ordering <= toId
ORDER BY "ordering")
LOOP
CALL "public"."AkkaMigration_Split"(var_r.id, var_r.tags, var_r.seq_nr, var_r.pid);
END LOOP;
END
$$ LANGUAGE plpgsql;

CREATE OR REPLACE PROCEDURE "public"."AkkaMigration_BatchedMigration"(IN from_id BIGINT) AS $$
DECLARE max_id BIGINT;
BEGIN
max_id := (SELECT MAX(ej."ordering")
FROM event_journal ej
WHERE
ej."tags" IS NOT NULL
AND LENGTH(ej."tags") > 0
AND ej."ordering" NOT IN (SELECT t."ordering_id" FROM "public"."tags" t));

LOOP
EXIT WHEN from_id > max_id;

CALL "public"."AkkaMigration_Normalize"(from_id, from_id + 1000);
COMMIT;

from_id := from_id + 1000;
END LOOP;
END;
$$ LANGUAGE plpgsql;
1 change: 1 addition & 0 deletions scripts/PostgreSql/Batched/2_Migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CALL "public"."AkkaMigration_BatchedMigration"(0);
3 changes: 3 additions & 0 deletions scripts/PostgreSql/Batched/3_Post_Migration_Cleanup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_BatchedMigration";
4 changes: 4 additions & 0 deletions scripts/PostgreSql/Batched/DROP_migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS "public"."TagTable";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_BatchedMigration";
4 changes: 2 additions & 2 deletions scripts/PostgreSql/DROP_migration.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
DROP TABLE IF EXISTS "public"."TagTable";
DROP PROCEDURE IF EXISTS "public"."Split";
DROP PROCEDURE IF EXISTS "public"."Normalize";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Split";
DROP PROCEDURE IF EXISTS "public"."AkkaMigration_Normalize";
2 changes: 1 addition & 1 deletion scripts/SqlServer/1_Migration_Setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ BEGIN
END
GO

CREATE OR ALTER FUNCTION [dbo].[Split](@String VARCHAR(8000), @Delimiter CHAR(1))
CREATE OR ALTER FUNCTION [dbo].[AkkaMigration_Split](@String VARCHAR(8000), @Delimiter CHAR(1))
RETURNS @temptable TABLE (items VARCHAR(8000)) AS
BEGIN
DECLARE @idx INT
Expand Down
6 changes: 3 additions & 3 deletions scripts/SqlServer/2_Migration.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
INSERT INTO [dbo].[tags]([ordering_id], [tag], [sequence_nr], [persistence_id])
SELECT * FROM (
SELECT a.[Ordering], b.[items], a.SequenceNr, a.PersistenceId FROM
[dbo].[EventJournal] AS a
CROSS APPLY [dbo].[Split](a.Tags, ';') b
SELECT records.[Ordering], cross_product.[items], records.SequenceNr, records.PersistenceId FROM
[dbo].[EventJournal] AS records
CROSS APPLY [dbo].[AkkaMigration_Split](records.Tags, ';') cross_product
) AS s([ordering_id], [tag], [sequence_nr], [persistence_id])
WHERE NOT EXISTS (
SELECT * FROM [dbo].[tags] t WITH (updlock)
Expand Down
2 changes: 1 addition & 1 deletion scripts/SqlServer/3_Post_Migration_Cleanup.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
DROP FUNCTION IF EXISTS [dbo].[Split];
DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split];
71 changes: 71 additions & 0 deletions scripts/SqlServer/Batched/1_Migration_Setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
IF NOT EXISTS(SELECT * FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = N'dbo' AND TABLE_NAME = N'tags')
BEGIN
CREATE TABLE [dbo].[tags](
ordering_id BIGINT NOT NULL,
tag NVARCHAR(64) NOT NULL,
sequence_nr BIGINT NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
PRIMARY KEY (ordering_id, tag, persistence_id)
);
END
GO

CREATE OR ALTER FUNCTION [dbo].[AkkaMigration_Split](@String VARCHAR(8000), @Delimiter CHAR(1))
RETURNS @temptable TABLE (items VARCHAR(8000)) AS
BEGIN
DECLARE @idx INT
DECLARE @slice VARCHAR(8000)

SELECT @idx = 1
IF LEN(@String) < 1 OR @String is NULL
RETURN

WHILE @idx != 0
BEGIN
SET @idx = CHARINDEX(@Delimiter, @String)
IF @idx != 0
SET @slice = LEFT(@String,@idx - 1)
ELSE
SET @slice = @String

IF(LEN(@slice) > 0)
INSERT INTO @temptable(Items) VALUES(@slice)

SET @String = RIGHT(@String, LEN(@String) - @idx)
IF len(@String) = 0
BREAK
END
RETURN
END;
GO

CREATE OR ALTER PROCEDURE [dbo].[AkkaMigration_BatchedMigration](@from_id BIGINT) AS
BEGIN
DECLARE @max_id BIGINT;

SELECT @max_id = MAX(ej.[Ordering])
FROM [dbo].[EventJournal] ej
WHERE
ej.[Tags] IS NOT NULL
AND LEN(ej.[Tags]) > 0
AND ej.[Ordering] NOT IN (SELECT t.[ordering_id] FROM [dbo].[tags] t);

WHILE @from_id <= @max_id
BEGIN
BEGIN TRAN;
INSERT INTO [dbo].[tags]([ordering_id], [tag], [sequence_nr], [persistence_id])
SELECT * FROM (
SELECT records.[Ordering], cross_product.[items], records.SequenceNr, records.PersistenceId FROM
[dbo].[EventJournal] AS records
CROSS APPLY [dbo].[AkkaMigration_Split](records.Tags, ';') cross_product
) AS s([ordering_id], [tag], [sequence_nr], [persistence_id])
WHERE NOT EXISTS (
SELECT * FROM [dbo].[tags] t WITH (updlock)
WHERE s.[ordering_id] = t.[ordering_id] AND s.[tag] = t.[tag]
);
COMMIT TRAN;

SET @from_id = @from_id + 1000;
END
END;
1 change: 1 addition & 0 deletions scripts/SqlServer/Batched/2_Migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
EXEC [dbo].[AkkaMigration_BatchedMigration] @from_id = 0;
2 changes: 2 additions & 0 deletions scripts/SqlServer/Batched/3_Post_Migration_Cleanup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split];
DROP PROCEDURE IF EXISTS [dbo].[AkkaMigration_BatchedMigration];
3 changes: 3 additions & 0 deletions scripts/SqlServer/Batched/DROP_Migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP TABLE IF EXISTS [dbo].[TagTable];
DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split];
DROP PROCEDURE IF EXISTS [dbo].[AkkaMigration_BatchedMigration];
2 changes: 1 addition & 1 deletion scripts/SqlServer/DROP_Migration.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
DROP TABLE IF EXISTS [dbo].[TagTable];
DROP FUNCTION IF EXISTS [dbo].[Split];
DROP FUNCTION IF EXISTS [dbo].[AkkaMigration_Split];