Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
JohannesKonings committed Jan 8, 2024
1 parent f879fc1 commit e6a749b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 50 deletions.
43 changes: 1 addition & 42 deletions cdk/lib/cdk-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,53 +115,12 @@ export class CdkStack extends Stack {
glueSecurityConfiguration: glueSecurityConfiguration,
glueDb: glueDb,
table: table,
tableName: name,
})
break;
default: throw new Error('kinesisFormat not supported');
}



// parquet format
// const s3Destination = new destinationsAlpha.S3Bucket(firehoseBucket, {
// encryptionKey: kmsKey,
// bufferingInterval: Duration.seconds(60),
// processor: lambdaProcessor,
// bufferingSize: Size.mebibytes(64),
// });



// // https://5k-team.trilogy.com/hc/en-us/articles/360015651640-Configuring-Firehose-with-CDK
// // https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html
// const firehoseDeliveryStreamCfn = firehoseDeliveryStream.node.defaultChild as CfnDeliveryStream;
// firehoseDeliveryStreamCfn.addPropertyOverride('ExtendedS3DestinationConfiguration.DataFormatConversionConfiguration', {
// inputFormatConfiguration: {
// deserializer: {
// // These settings might need to be changed based on the use case
// // This is the default settings when configured through the console
// openXJsonSerDe: {
// caseInsensitive: false,
// // Add hive keywords (e.g. timestamp) if they are added to events schema
// columnToJsonKeyMappings: {},
// convertDotsInJsonKeysToUnderscores: false,
// },
// },
// },
// outputFormatConfiguration: {
// serializer: {
// parquetSerDe: {
// compression: 'SNAPPY',
// },
// },
// },
// schemaConfiguration: {
// databaseName: this.backendStack.glueStack.database.databaseName, // Target Glue database name
// roleArn: this.deliveryStreamRole.roleArn,
// tableName: this.backendStack.glueStack.eventsTable.tableName, // Target Glue table name
// },
// });

const athenaQueryResults = new s3.Bucket(this, 'query-results', {
bucketName: `${name}-query-results`,
encryptionKey: kmsKey,
Expand Down
70 changes: 62 additions & 8 deletions cdk/lib/firehose/firehose-parquet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import {
aws_logs as logs,
RemovalPolicy,
Stack,
Size,
} from 'aws-cdk-lib';
import {
LambdaFunctionProcessor as LambdaFunctionProcessorAlpha,
DeliveryStream as DeliveryStreamAlpha
} from '@aws-cdk/aws-kinesisfirehose-alpha'
import * as glueAlpha from '@aws-cdk/aws-glue-alpha'
import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';


export interface FirehoseParquetProps {
Expand All @@ -28,13 +29,14 @@ export interface FirehoseParquetProps {
glueSecurityConfiguration: glueAlpha.SecurityConfiguration
glueDb: glueAlpha.Database
table: dynamodb.ITable
tableName: string
}

export class FirehoseParquet extends Construct {
constructor(scope: Construct, id: string, props: FirehoseParquetProps) {
super(scope, id)

const { kmsKey, firehoseBucket, name, ddbChangesPrefix, stream, glueSecurityConfiguration, glueDb, table } = props
const { kmsKey, firehoseBucket, name, ddbChangesPrefix, stream, glueSecurityConfiguration, glueDb, table, tableName } = props
const roleName = `${name}-crawler-ddb-role`;
const roleCrawlerddb = new iam.Role(this, 'roleCrawlerDdb', {
roleName: roleName,
Expand Down Expand Up @@ -76,17 +78,15 @@ export class FirehoseParquet extends Construct {
glueDb.catalogArn,
glueDb.databaseArn,
kmsKey.keyArn,
firehoseBucket.bucketArn,
`${firehoseBucket.bucketArn}/*`,
glueCrawlerArn,
table.tableArn,
],
actions: [
'logs:*',
'glue:*',
'kms:Decrypt',
'S3:*',
'dynamodb:DescribeTable',
'dynamodb:Scan',
],
})
)
Expand All @@ -98,22 +98,76 @@ export class FirehoseParquet extends Construct {
)
glueSecurityConfiguration.node.addDependency(roleCrawlerddb)


const s3Destination = new destinationsAlpha.S3Bucket(firehoseBucket, {
encryptionKey: kmsKey,
bufferingInterval: Duration.seconds(60),
dataOutputPrefix: `${ddbChangesPrefix}/`,
logGroup: new logs.LogGroup(this, 'firehose--parquet-s3-log-group', {
logGroup: new logs.LogGroup(this, 'firehose-parquet-s3-log-group', {
logGroupName: `${name}-firehose-parquet-s3-log-group`,
removalPolicy: RemovalPolicy.DESTROY,
}),
bufferingSize: Size.mebibytes(64),
})

new DeliveryStreamAlpha(this, 'Delivery Stream', {
const glueTableName = tableName.replace(/-/g, '_')
console.log(`glueTableName: ${glueTableName}`)
// https://5k-team.trilogy.com/hc/en-us/articles/360015651640-Configuring-Firehose-with-CDK
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html


const firehoseDeliveryStream = new DeliveryStreamAlpha(this, 'Delivery Stream', {
deliveryStreamName: `${name}-firehose-parquet`,
sourceStream: stream,
destinations: [s3Destination],
})

const firehoseRole = firehoseDeliveryStream.node.findChild('S3 Destination Role') as iam.Role;
// firehoseRole.addToPolicy(
// new iam.PolicyStatement({
// effect: iam.Effect.ALLOW,
// resources: [
// glueDb.databaseArn,
// `arn:aws:glue:${Stack.of(this).region}:${Stack.of(this).account}:catalog`
// ],
// actions: ['glue:GetTable', 'glue:GetTableVersion'],
// })
// );
firehoseRole.addToPolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
resources: ['*'],
actions: ['*'],
})
);

const firehoseDeliveryStreamCfn = firehoseDeliveryStream.node.defaultChild as CfnDeliveryStream;
firehoseDeliveryStreamCfn.addPropertyOverride('ExtendedS3DestinationConfiguration.DataFormatConversionConfiguration', {
inputFormatConfiguration: {
deserializer: {
// These settings might need to be changed based on the use case
// This is the default settings when configured through the console
openXJsonSerDe: {
caseInsensitive: false,
// Add hive keywords (e.g. timestamp) if they are added to events schema
columnToJsonKeyMappings: {},
convertDotsInJsonKeysToUnderscores: false,
},
},
},
outputFormatConfiguration: {
serializer: {
parquetSerDe: {
compression: 'SNAPPY',
},
},
},
schemaConfiguration: {
databaseName: glueDb.databaseName,
roleArn: firehoseRole.roleArn,
tableName: glueTableName,
},
});
firehoseDeliveryStreamCfn.node.addDependency(firehoseRole);

}
}

0 comments on commit e6a749b

Please sign in to comment.