A production-ready, enterprise-grade data platform template for AWS that integrates real-time streaming, batch processing, machine learning, and business intelligence capabilities. Built entirely with AWS CDK in Python, this template provides a complete foundation for data-driven organizations.
- Amazon Kinesis Data Streams for high-throughput data ingestion
- AWS Lambda for serverless stream processing
- Amazon DynamoDB for low-latency data storage
- Kinesis Data Analytics for real-time SQL analytics
- Amazon EMR for large-scale distributed processing
- AWS Glue ETL for serverless data transformation
- Amazon Athena for interactive SQL queries
- Apache Spark and Apache Hive support
- Amazon S3 multi-tier storage (raw, processed, curated)
- AWS Glue Data Catalog for metadata management
- Amazon Redshift for data warehousing
- Amazon QuickSight for business intelligence
- Amazon SageMaker for model training and deployment
- Feature Store for ML feature management
- MLflow for experiment tracking
- Real-time inference endpoints
- Amazon CloudWatch dashboards and alarms
- AWS CloudTrail for audit logging
- AWS Lake Formation for data governance
- Cost optimization with auto-scaling
- AWS Account with appropriate permissions
- AWS CLI configured (
aws configure) - Python 3.9+ installed
- Node.js 14+ (for CDK)
- Docker (for Lambda packaging)
git clone https://github.com/yourusername/aws-data-platform.git
cd aws-data-platform# Create virtual environment
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
pip install -r requirements-dev.txtnpm install -g aws-cdk
cdk --version# Copy environment template
cp .env.example .env
# Edit .env with your configuration
nano .envRequired environment variables:
AWS_ACCOUNT_ID=123456789012
AWS_REGION=us-east-1
ENVIRONMENT=dev
DATA_LAKE_BUCKET_PREFIX=my-company-data-lake
REDSHIFT_MASTER_USER=admin
NOTIFICATION_EMAIL=data-team@company.comβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Sources β
β (Applications, IoT Devices, APIs, Databases, Files) β
βββββββββββββββ¬ββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββββββ ββββββββββββββββββββββββββ
β Real-Time Layer β β Batch Layer β
ββββββββββββββββββββββββ€ ββββββββββββββββββββββββββ€
β β’ Kinesis Streams β β β’ S3 Data Lake β
β β’ Lambda Functions β β β’ Glue ETL Jobs β
β β’ DynamoDB β β β’ EMR Clusters β
β β’ Kinesis Analytics β β β’ Athena Queries β
ββββββββββββ¬ββββββββββββ βββββββββββββ¬βββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Processing & Analytics β
βββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β’ Spark Streaming β’ Batch ETL β
β β’ ML Feature Eng. β’ Data Validation β
β β’ Aggregations β’ Data Quality β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Storage & Serving Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β’ Redshift (DW) β’ DynamoDB (NoSQL) β
β β’ S3 (Data Lake) β’ ElasticSearch β
β β’ Feature Store β’ Time Series DB β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Consumption Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β’ QuickSight β’ SageMaker β
β β’ API Gateway β’ Custom Apps β
β β’ Notebooks β’ ML Inference β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
# Deploy all stacks
./scripts/deploy.sh --all --environment dev
# Or deploy individual components
./scripts/deploy.sh --stack streaming --environment dev
./scripts/deploy.sh --stack batch --environment dev
./scripts/deploy.sh --stack ml --environment dev# Check stack status
aws cloudformation describe-stacks --stack-name DataPlatform-Dev-*
# Run validation tests
pytest tests/integration/test_deployment.pyaws-data-platform/
βββ infrastructure/ # CDK infrastructure code
β βββ stacks/ # CDK stack definitions
β β βββ streaming/ # Real-time streaming stack
β β βββ batch/ # Batch processing stack
β β βββ storage/ # Data lake & warehouse stack
β β βββ ml/ # Machine learning stack
β β βββ monitoring/ # Monitoring stack
β βββ constructs/ # Reusable CDK constructs
β βββ configs/ # Environment configurations
βββ src/ # Application source code
β βββ ingestion/ # Data ingestion modules
β βββ processing/ # Data processing logic
β βββ ml/ # ML pipelines and models
β βββ orchestration/ # Workflow orchestration
βββ tests/ # Test suites
βββ scripts/ # Deployment and utility scripts
βββ docs/ # Documentation
Edit infrastructure/stacks/streaming/kinesis_stack.py:
from aws_cdk import aws_kinesis as kinesis
class StreamingStack(Stack):
def __init__(self, scope, id, **kwargs):
super().__init__(scope, id, **kwargs)
# Add your custom stream
self.custom_stream = kinesis.Stream(
self, "CustomDataStream",
stream_name=f"custom-data-{self.environment}",
shard_count=2,
retention_period=Duration.days(7)
)Modify src/processing/batch/spark_jobs.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
class DataProcessor:
def __init__(self, app_name="DataProcessor"):
self.spark = SparkSession.builder \
.appName(app_name) \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
def process_sales_data(self, input_path, output_path):
"""
Customize this method for your business logic
"""
df = self.spark.read.parquet(input_path)
# Add your transformations
processed_df = df \
.filter(col("amount") > 0) \
.groupBy("product_id", "date") \
.agg(
sum("amount").alias("total_revenue"),
count("transaction_id").alias("transaction_count")
)
processed_df.write \
.mode("overwrite") \
.partitionBy("date") \
.parquet(output_path)Configure ML workflows in src/ml/training/pipeline.py:
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline import Pipeline
class MLPipeline:
def __init__(self, role, bucket):
self.role = role
self.bucket = bucket
self.session = sagemaker.Session()
def create_training_pipeline(self, model_name):
"""
Define your ML training pipeline
"""
# Feature engineering step
processor = SKLearnProcessor(
framework_version="0.23-1",
instance_type="ml.m5.xlarge",
role=self.role
)
# Add your pipeline steps
# ...
return Pipeline(
name=f"{model_name}-training",
steps=[preprocessing_step, training_step, evaluation_step]
)Set up QuickSight dashboards in infrastructure/stacks/analytics/quicksight.py:
from aws_cdk import aws_quicksight as qs
class QuickSightDashboard(Construct):
def __init__(self, scope, id, data_source_arn):
super().__init__(scope, id)
# Create custom dashboard
self.dashboard = qs.CfnDashboard(
self, "BusinessDashboard",
dashboard_id="business-metrics",
name="Business Metrics Dashboard",
source_entity=qs.CfnDashboard.DashboardSourceEntityProperty(
source_template=qs.CfnDashboard.DashboardSourceTemplateProperty(
data_set_references=[
# Configure your datasets
]
)
)
)The platform implements least-privilege access:
# infrastructure/constructs/security.py
from aws_cdk import aws_iam as iam
class DataPlatformSecurity:
@staticmethod
def create_glue_role(scope, id):
"""Create IAM role for Glue ETL jobs"""
return iam.Role(
scope, id,
assumed_by=iam.ServicePrincipal("glue.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSGlueServiceRole"
)
],
inline_policies={
"S3Access": iam.PolicyDocument(
statements=[
iam.PolicyStatement(
actions=["s3:GetObject", "s3:PutObject"],
resources=[f"arn:aws:s3:::{bucket}/*"]
)
]
)
}
)All data is encrypted at rest and in transit:
- S3: SSE-S3 or SSE-KMS
- Redshift: KMS encryption
- DynamoDB: Encryption at rest
- Kinesis: Server-side encryption
Pre-configured dashboards for:
- Stream processing metrics
- ETL job performance
- Data quality metrics
- ML model performance
- Cost tracking
Automated alerts for:
- Failed ETL jobs
- Stream processing errors
- Data quality violations
- Cost anomalies
- Security events
# infrastructure/configs/scaling.yaml
emr_cluster:
min_instances: 2
max_instances: 10
target_utilization: 70
scale_down_delay: 300
kinesis_streams:
auto_scaling_enabled: true
target_utilization: 70
scale_in_cooldown: 60
scale_out_cooldown: 60All resources are tagged for cost allocation:
Tags.of(stack).add("Environment", environment)
Tags.of(stack).add("Project", "DataPlatform")
Tags.of(stack).add("CostCenter", "DataEngineering")
Tags.of(stack).add("Owner", "data-team@company.com")# Run unit tests
pytest tests/unit/ -v
# With coverage
pytest tests/unit/ --cov=src --cov-report=html# Test data pipeline
pytest tests/integration/test_pipeline.py
# Test ML workflows
pytest tests/integration/test_ml_pipeline.py# Generate test data
python scripts/generate_test_data.py --records 1000000
# Run load test
locust -f tests/load/test_streaming.py --host https://kinesis.us-east-1.amazonaws.comMonitor business metrics in real-time:
- Sales transactions
- User activity
- System performance
- Fraud detection
Unified customer data platform:
- Profile aggregation
- Behavior tracking
- Segmentation
- Personalization
IoT data processing for:
- Anomaly detection
- Failure prediction
- Optimization recommendations
Automated financial analytics:
- Revenue forecasting
- Cost analysis
- Compliance reporting
- Risk assessment
-
Stack deployment fails
# Check CloudFormation events aws cloudformation describe-stack-events --stack-name DataPlatform-Dev-Streaming -
Glue job failures
# Check Glue job logs aws glue get-job-runs --job-name my-etl-job -
Permission errors
# Verify IAM roles aws iam simulate-principal-policy --policy-source-arn arn:aws:iam::123456789012:role/GlueRole
We welcome contributions! Please see CONTRIBUTING.md for details.
This project is licensed under the MIT License - see LICENSE file.
- Documentation: docs/
- Issues: GitHub Issues
- Discussions: GitHub Discussions
Built with AWS best practices and community contributions.
Note: This is a template repository. Customize it according to your organization's specific requirements.