Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions misc/helm-charts/operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ rules:
resources:
- materializes
- materializes/status
- environments
- environments/status
- balancers
- balancers/status
- consoles
Expand Down
1 change: 1 addition & 0 deletions src/cloud-resources/src/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use mz_ore::retry::Retry;

pub mod balancer;
pub mod console;
pub mod environment;
pub mod generated;
pub mod materialize;
#[cfg(feature = "vpc-endpoints")]
Expand Down
160 changes: 160 additions & 0 deletions src/cloud-resources/src/crd/environment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;

use kube::{CustomResource, Resource, ResourceExt};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id};

pub mod v1alpha1 {
use super::*;

#[derive(
CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema,
)]
#[serde(rename_all = "camelCase")]
#[kube(
namespaced,
group = "materialize.cloud",
version = "v1alpha1",
kind = "Environment",
singular = "environment",
plural = "environments",
status = "EnvironmentStatus"
)]
pub struct EnvironmentSpec {
/// {{<warning>}}
/// Deprecated.
///
/// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead.
/// {{</warning>}}
///
/// If running in AWS, override the IAM role to use to give
/// environmentd access to the persist S3 bucket.
#[kube(deprecated)]
pub environmentd_iam_role_arn: Option<String>,

/// Name of the kubernetes service account to use.
/// If not set, we will create one with the same name as this Materialize object.
pub service_account_name: Option<String>,
/// Annotations to apply to the service account.
///
/// Annotations on service accounts are commonly used by cloud providers for IAM.
/// AWS uses "eks.amazonaws.com/role-arn".
/// Azure uses "azure.workload.identity/client-id", but
/// additionally requires "azure.workload.identity/use": "true" on the pods.
pub service_account_annotations: Option<BTreeMap<String, String>>,
/// Labels to apply to the service account.
pub service_account_labels: Option<BTreeMap<String, String>>,

/// The cert-manager Issuer or ClusterIssuer to use for database internal communication.
/// The `issuerRef` field is required.
/// This currently is only used for environmentd, but will eventually support clusterd.
/// Not yet implemented.
pub internal_certificate_spec: Option<MaterializeCertSpec>,

// This can be set to override the randomly chosen resource id
pub resource_id: Option<String>,
}

impl Environment {
pub fn name_prefixed(&self, suffix: &str) -> String {
format!("mz{}-{}", self.resource_id(), suffix)
}

pub fn resource_id(&self) -> &str {
&self.status.as_ref().unwrap().resource_id
}

pub fn namespace(&self) -> String {
self.meta().namespace.clone().unwrap()
}

pub fn create_service_account(&self) -> bool {
self.spec.service_account_name.is_none()
}

pub fn service_account_name(&self) -> String {
self.spec
.service_account_name
.clone()
.unwrap_or_else(|| self.name_unchecked())
}

pub fn role_name(&self) -> String {
self.name_unchecked()
}

pub fn role_binding_name(&self) -> String {
self.name_unchecked()
}

pub fn app_name(&self) -> String {
"environmentd".to_owned()
}

pub fn balancerd_app_name(&self) -> String {
"balancerd".to_owned()
}

pub fn certificate_name(&self) -> String {
self.name_prefixed("environmentd-external")
}

pub fn certificate_secret_name(&self) -> String {
self.name_prefixed("environmentd-tls")
}

pub fn service_name(&self) -> String {
self.name_prefixed("environmentd")
}

pub fn service_internal_fqdn(&self) -> String {
format!(
"{}.{}.svc.cluster.local",
self.service_name(),
self.namespace(),
)
}

pub fn status(&self) -> EnvironmentStatus {
self.status.clone().unwrap_or_else(|| EnvironmentStatus {
resource_id: self
.spec
.resource_id
.clone()
.unwrap_or_else(new_resource_id),
})
}
}

#[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)]
pub struct EnvironmentStatus {
/// Resource identifier used as a name prefix to avoid pod name collisions.
pub resource_id: String,
}

impl ManagedResource for Environment {
fn default_labels(&self) -> BTreeMap<String, String> {
BTreeMap::from_iter([
(
"materialize.cloud/mz-resource-id".to_owned(),
self.resource_id().to_owned(),
),
(
"materialize.cloud/app".to_owned(),
"environmentd".to_owned(),
),
])
}
}
}
70 changes: 61 additions & 9 deletions src/orchestratord/src/bin/orchestratord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ use http::HeaderValue;
use k8s_openapi::{
api::{
apps::v1::Deployment,
core::v1::{Affinity, ConfigMap, ResourceRequirements, Service, Toleration},
core::v1::{
Affinity, ConfigMap, ResourceRequirements, Service, ServiceAccount, Toleration,
},
networking::v1::NetworkPolicy,
rbac::v1::{Role, RoleBinding},
},
apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition,
};
Expand Down Expand Up @@ -319,7 +322,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
console_image_tag_default: args.console_image_tag_default,
console_image_tag_map: args.console_image_tag_map,
aws_account_id: args.aws_account_id,
environmentd_iam_role_arn: args.environmentd_iam_role_arn,
environmentd_connection_role_arn: args.environmentd_connection_role_arn,
aws_secrets_controller_tags: args.aws_secrets_controller_tags,
environmentd_availability_zones: args.environmentd_availability_zones,
Expand All @@ -328,7 +330,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
enable_security_context: args.enable_security_context,
enable_internal_statement_logging: args.enable_internal_statement_logging,
disable_statement_logging: args.disable_statement_logging,
orchestratord_pod_selector_labels: args.orchestratord_pod_selector_labels,
environmentd_node_selector: args.environmentd_node_selector,
environmentd_affinity: args.environmentd_affinity,
environmentd_tolerations: args.environmentd_tolerations,
Expand All @@ -337,11 +338,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
clusterd_affinity: args.clusterd_affinity,
clusterd_tolerations: args.clusterd_tolerations,
image_pull_policy: args.image_pull_policy,
network_policies_internal_enabled: args.network_policies_internal_enabled,
network_policies_ingress_enabled: args.network_policies_ingress_enabled,
network_policies_ingress_cidrs: args.network_policies_ingress_cidrs.clone(),
network_policies_egress_enabled: args.network_policies_egress_enabled,
network_policies_egress_cidrs: args.network_policies_egress_cidrs,
environmentd_cluster_replica_sizes: args.environmentd_cluster_replica_sizes,
bootstrap_default_cluster_replica_size: args
.bootstrap_default_cluster_replica_size,
Expand Down Expand Up @@ -374,7 +370,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
default_certificate_specs: args.default_certificate_specs.clone(),
disable_license_key_checks: args.disable_license_key_checks,
tracing: args.tracing,
orchestratord_namespace: namespace,
},
Arc::clone(&metrics),
client.clone(),
Expand All @@ -385,6 +380,63 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
.run(),
);

mz_ore::task::spawn(
|| "environment controller",
k8s_controller::Controller::namespaced_all(
client.clone(),
controller::environment::Context::new(controller::environment::Config {
cloud_provider: args.cloud_provider,
environmentd_iam_role_arn: args.environmentd_iam_role_arn,
orchestratord_pod_selector_labels: args.orchestratord_pod_selector_labels,
network_policies_internal_enabled: args.network_policies_internal_enabled,
network_policies_ingress_enabled: args.network_policies_ingress_enabled,
network_policies_ingress_cidrs: args.network_policies_ingress_cidrs.clone(),
network_policies_egress_enabled: args.network_policies_egress_enabled,
network_policies_egress_cidrs: args.network_policies_egress_cidrs,
environmentd_sql_port: args.environmentd_sql_port,
environmentd_http_port: args.environmentd_http_port,
environmentd_internal_http_port: args.environmentd_internal_http_port,
default_certificate_specs: args.default_certificate_specs.clone(),
orchestratord_namespace: namespace,
}),
watcher::Config::default().timeout(29),
)
.with_controller(|controller| {
controller
.owns(
Api::<NetworkPolicy>::all(client.clone()),
watcher::Config::default()
.labels("materialize.cloud/mz-resource-id")
.timeout(29),
)
.owns(
Api::<ServiceAccount>::all(client.clone()),
watcher::Config::default()
.labels("materialize.cloud/mz-resource-id")
.timeout(29),
)
.owns(
Api::<Role>::all(client.clone()),
watcher::Config::default()
.labels("materialize.cloud/mz-resource-id")
.timeout(29),
)
.owns(
Api::<RoleBinding>::all(client.clone()),
watcher::Config::default()
.labels("materialize.cloud/mz-resource-id")
.timeout(29),
)
.owns(
Api::<Certificate>::all(client.clone()),
watcher::Config::default()
.labels("materialize.cloud/mz-resource-id")
.timeout(29),
)
})
.run(),
);

mz_ore::task::spawn(
|| "balancer controller",
k8s_controller::Controller::namespaced_all(
Expand Down
1 change: 1 addition & 0 deletions src/orchestratord/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@

pub mod balancer;
pub mod console;
pub mod environment;
pub mod materialize;
Loading