From 3ab5a362bc1da20923741a8c2ae61f210c61efb3 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Wed, 17 Dec 2025 14:54:08 -0500 Subject: [PATCH 1/2] move global environment resources to their own controller --- .../operator/templates/clusterrole.yaml | 2 + src/cloud-resources/src/crd.rs | 1 + src/cloud-resources/src/crd/environment.rs | 160 ++++++ src/orchestratord/src/bin/orchestratord.rs | 72 ++- src/orchestratord/src/controller.rs | 1 + .../src/controller/environment.rs | 485 ++++++++++++++++++ src/orchestratord/src/k8s.rs | 4 + 7 files changed, 720 insertions(+), 5 deletions(-) create mode 100644 src/cloud-resources/src/crd/environment.rs create mode 100644 src/orchestratord/src/controller/environment.rs diff --git a/misc/helm-charts/operator/templates/clusterrole.yaml b/misc/helm-charts/operator/templates/clusterrole.yaml index ce265f5d89ebc..420e23900ccd8 100644 --- a/misc/helm-charts/operator/templates/clusterrole.yaml +++ b/misc/helm-charts/operator/templates/clusterrole.yaml @@ -88,6 +88,8 @@ rules: resources: - materializes - materializes/status + - environments + - environments/status - balancers - balancers/status - consoles diff --git a/src/cloud-resources/src/crd.rs b/src/cloud-resources/src/crd.rs index b59615597a44b..a539577f70e09 100644 --- a/src/cloud-resources/src/crd.rs +++ b/src/cloud-resources/src/crd.rs @@ -33,6 +33,7 @@ use mz_ore::retry::Retry; pub mod balancer; pub mod console; +pub mod environment; pub mod generated; pub mod materialize; #[cfg(feature = "vpc-endpoints")] diff --git a/src/cloud-resources/src/crd/environment.rs b/src/cloud-resources/src/crd/environment.rs new file mode 100644 index 0000000000000..a522fd653728f --- /dev/null +++ b/src/cloud-resources/src/crd/environment.rs @@ -0,0 +1,160 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; + +use kube::{CustomResource, Resource, ResourceExt}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::crd::{ManagedResource, MaterializeCertSpec, new_resource_id}; + +pub mod v1alpha1 { + use super::*; + + #[derive( + CustomResource, Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema, + )] + #[serde(rename_all = "camelCase")] + #[kube( + namespaced, + group = "materialize.cloud", + version = "v1alpha1", + kind = "Environment", + singular = "environment", + plural = "environments", + status = "EnvironmentStatus" + )] + pub struct EnvironmentSpec { + /// {{}} + /// Deprecated. + /// + /// Use `service_account_annotations` to set "eks.amazonaws.com/role-arn" instead. + /// {{}} + /// + /// If running in AWS, override the IAM role to use to give + /// environmentd access to the persist S3 bucket. + #[kube(deprecated)] + pub environmentd_iam_role_arn: Option, + + /// Name of the kubernetes service account to use. + /// If not set, we will create one with the same name as this Materialize object. + pub service_account_name: Option, + /// Annotations to apply to the service account. + /// + /// Annotations on service accounts are commonly used by cloud providers for IAM. + /// AWS uses "eks.amazonaws.com/role-arn". + /// Azure uses "azure.workload.identity/client-id", but + /// additionally requires "azure.workload.identity/use": "true" on the pods. + pub service_account_annotations: Option>, + /// Labels to apply to the service account. + pub service_account_labels: Option>, + + /// The cert-manager Issuer or ClusterIssuer to use for database internal communication. + /// The `issuerRef` field is required. + /// This currently is only used for environmentd, but will eventually support clusterd. + /// Not yet implemented. + pub internal_certificate_spec: Option, + + // This can be set to override the randomly chosen resource id + pub resource_id: Option, + } + + impl Environment { + pub fn name_prefixed(&self, suffix: &str) -> String { + format!("mz{}-{}", self.resource_id(), suffix) + } + + pub fn resource_id(&self) -> &str { + &self.status.as_ref().unwrap().resource_id + } + + pub fn namespace(&self) -> String { + self.meta().namespace.clone().unwrap() + } + + pub fn create_service_account(&self) -> bool { + self.spec.service_account_name.is_none() + } + + pub fn service_account_name(&self) -> String { + self.spec + .service_account_name + .clone() + .unwrap_or_else(|| self.name_unchecked()) + } + + pub fn role_name(&self) -> String { + self.name_unchecked() + } + + pub fn role_binding_name(&self) -> String { + self.name_unchecked() + } + + pub fn app_name(&self) -> String { + "environmentd".to_owned() + } + + pub fn balancerd_app_name(&self) -> String { + "balancerd".to_owned() + } + + pub fn certificate_name(&self) -> String { + self.name_prefixed("environmentd-external") + } + + pub fn certificate_secret_name(&self) -> String { + self.name_prefixed("environmentd-tls") + } + + pub fn service_name(&self) -> String { + self.name_prefixed("environmentd") + } + + pub fn service_internal_fqdn(&self) -> String { + format!( + "{}.{}.svc.cluster.local", + self.service_name(), + self.namespace(), + ) + } + + pub fn status(&self) -> EnvironmentStatus { + self.status.clone().unwrap_or_else(|| EnvironmentStatus { + resource_id: self + .spec + .resource_id + .clone() + .unwrap_or_else(new_resource_id), + }) + } + } + + #[derive(Clone, Debug, Default, PartialEq, Deserialize, Serialize, JsonSchema)] + pub struct EnvironmentStatus { + /// Resource identifier used as a name prefix to avoid pod name collisions. + pub resource_id: String, + } + + impl ManagedResource for Environment { + fn default_labels(&self) -> BTreeMap { + BTreeMap::from_iter([ + ( + "materialize.cloud/mz-resource-id".to_owned(), + self.resource_id().to_owned(), + ), + ( + "materialize.cloud/app".to_owned(), + "environmentd".to_owned(), + ), + ]) + } + } +} diff --git a/src/orchestratord/src/bin/orchestratord.rs b/src/orchestratord/src/bin/orchestratord.rs index f967dc3e7235d..0ed8cb5761533 100644 --- a/src/orchestratord/src/bin/orchestratord.rs +++ b/src/orchestratord/src/bin/orchestratord.rs @@ -17,8 +17,11 @@ use http::HeaderValue; use k8s_openapi::{ api::{ apps::v1::Deployment, - core::v1::{Affinity, ConfigMap, ResourceRequirements, Service, Toleration}, + core::v1::{ + Affinity, ConfigMap, ResourceRequirements, Service, ServiceAccount, Toleration, + }, networking::v1::NetworkPolicy, + rbac::v1::{Role, RoleBinding}, }, apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition, }; @@ -319,7 +322,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { console_image_tag_default: args.console_image_tag_default, console_image_tag_map: args.console_image_tag_map, aws_account_id: args.aws_account_id, - environmentd_iam_role_arn: args.environmentd_iam_role_arn, + environmentd_iam_role_arn: args.environmentd_iam_role_arn.clone(), environmentd_connection_role_arn: args.environmentd_connection_role_arn, aws_secrets_controller_tags: args.aws_secrets_controller_tags, environmentd_availability_zones: args.environmentd_availability_zones, @@ -328,7 +331,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { enable_security_context: args.enable_security_context, enable_internal_statement_logging: args.enable_internal_statement_logging, disable_statement_logging: args.disable_statement_logging, - orchestratord_pod_selector_labels: args.orchestratord_pod_selector_labels, + orchestratord_pod_selector_labels: args + .orchestratord_pod_selector_labels + .clone(), environmentd_node_selector: args.environmentd_node_selector, environmentd_affinity: args.environmentd_affinity, environmentd_tolerations: args.environmentd_tolerations, @@ -341,7 +346,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { network_policies_ingress_enabled: args.network_policies_ingress_enabled, network_policies_ingress_cidrs: args.network_policies_ingress_cidrs.clone(), network_policies_egress_enabled: args.network_policies_egress_enabled, - network_policies_egress_cidrs: args.network_policies_egress_cidrs, + network_policies_egress_cidrs: args.network_policies_egress_cidrs.clone(), environmentd_cluster_replica_sizes: args.environmentd_cluster_replica_sizes, bootstrap_default_cluster_replica_size: args .bootstrap_default_cluster_replica_size, @@ -374,7 +379,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { default_certificate_specs: args.default_certificate_specs.clone(), disable_license_key_checks: args.disable_license_key_checks, tracing: args.tracing, - orchestratord_namespace: namespace, + orchestratord_namespace: namespace.clone(), }, Arc::clone(&metrics), client.clone(), @@ -385,6 +390,63 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { .run(), ); + mz_ore::task::spawn( + || "environment controller", + k8s_controller::Controller::namespaced_all( + client.clone(), + controller::environment::Context::new(controller::environment::Config { + cloud_provider: args.cloud_provider, + environmentd_iam_role_arn: args.environmentd_iam_role_arn, + orchestratord_pod_selector_labels: args.orchestratord_pod_selector_labels, + network_policies_internal_enabled: args.network_policies_internal_enabled, + network_policies_ingress_enabled: args.network_policies_ingress_enabled, + network_policies_ingress_cidrs: args.network_policies_ingress_cidrs.clone(), + network_policies_egress_enabled: args.network_policies_egress_enabled, + network_policies_egress_cidrs: args.network_policies_egress_cidrs, + environmentd_sql_port: args.environmentd_sql_port, + environmentd_http_port: args.environmentd_http_port, + environmentd_internal_http_port: args.environmentd_internal_http_port, + default_certificate_specs: args.default_certificate_specs.clone(), + orchestratord_namespace: namespace, + }), + watcher::Config::default().timeout(29), + ) + .with_controller(|controller| { + controller + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + .owns( + Api::::all(client.clone()), + watcher::Config::default() + .labels("materialize.cloud/mz-resource-id") + .timeout(29), + ) + }) + .run(), + ); + mz_ore::task::spawn( || "balancer controller", k8s_controller::Controller::namespaced_all( diff --git a/src/orchestratord/src/controller.rs b/src/orchestratord/src/controller.rs index a4d0b8c23bb4e..8d4f3c67461ae 100644 --- a/src/orchestratord/src/controller.rs +++ b/src/orchestratord/src/controller.rs @@ -9,4 +9,5 @@ pub mod balancer; pub mod console; +pub mod environment; pub mod materialize; diff --git a/src/orchestratord/src/controller/environment.rs b/src/orchestratord/src/controller/environment.rs new file mode 100644 index 0000000000000..5a9905ee7e284 --- /dev/null +++ b/src/orchestratord/src/controller/environment.rs @@ -0,0 +1,485 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; + +use k8s_openapi::{ + api::{ + core::v1::ServiceAccount, + networking::v1::{ + IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule, + NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec, + }, + rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject}, + }, + apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, +}; +use kube::{ + Api, Client, Resource, ResourceExt, + api::{ObjectMeta, PostParams}, + runtime::controller::Action, +}; +use maplit::btreemap; +use mz_cloud_provider::CloudProvider; +use tracing::{trace, warn}; + +use crate::{ + Error, + k8s::apply_resource, + tls::{DefaultCertificateSpecs, create_certificate}, +}; +use mz_cloud_resources::crd::{ + ManagedResource, + environment::v1alpha1::Environment, + generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, +}; +use mz_ore::{cli::KeyValueArg, instrument}; + +pub struct Config { + pub cloud_provider: CloudProvider, + + pub environmentd_iam_role_arn: Option, + + pub orchestratord_pod_selector_labels: Vec>, + pub network_policies_internal_enabled: bool, + pub network_policies_ingress_enabled: bool, + pub network_policies_ingress_cidrs: Vec, + pub network_policies_egress_enabled: bool, + pub network_policies_egress_cidrs: Vec, + + pub environmentd_sql_port: u16, + pub environmentd_http_port: u16, + pub environmentd_internal_http_port: u16, + + pub default_certificate_specs: DefaultCertificateSpecs, + + pub orchestratord_namespace: String, +} + +pub struct Context { + config: Config, +} + +impl Context { + pub fn new(config: Config) -> Self { + Self { config } + } + + fn create_network_policies(&self, environment: &Environment) -> Vec { + let mut network_policies = Vec::new(); + if self.config.network_policies_internal_enabled { + let environmentd_label_selector = LabelSelector { + match_labels: Some( + environment + .default_labels() + .into_iter() + .chain([("materialize.cloud/app".to_owned(), environment.app_name())]) + .collect(), + ), + ..Default::default() + }; + let orchestratord_label_selector = LabelSelector { + match_labels: Some( + self.config + .orchestratord_pod_selector_labels + .iter() + .cloned() + .map(|kv| (kv.key, kv.value)) + .collect(), + ), + ..Default::default() + }; + // TODO (Alex) filter to just clusterd and environmentd, + // once we get a consistent set of labels for both. + let all_pods_label_selector = LabelSelector { + // TODO: can't use default_labels() here because it needs to be + // consistent between balancer and materialize resources, and + // materialize resources have additional labels - we should + // figure out something better here (probably balancers should + // install their own network policies) + match_labels: Some( + [( + "materialize.cloud/mz-resource-id".to_owned(), + environment.resource_id().to_owned(), + )] + .into(), + ), + ..Default::default() + }; + network_policies.extend([ + // Allow all clusterd/environmentd traffic (between pods in the + // same environment) + NetworkPolicy { + metadata: environment.managed_resource_meta( + environment.name_prefixed("allow-all-within-environment"), + ), + spec: Some(NetworkPolicySpec { + egress: Some(vec![NetworkPolicyEgressRule { + to: Some(vec![NetworkPolicyPeer { + pod_selector: Some(all_pods_label_selector.clone()), + ..Default::default() + }]), + ..Default::default() + }]), + ingress: Some(vec![NetworkPolicyIngressRule { + from: Some(vec![NetworkPolicyPeer { + pod_selector: Some(all_pods_label_selector.clone()), + ..Default::default() + }]), + ..Default::default() + }]), + pod_selector: Some(all_pods_label_selector.clone()), + policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]), + ..Default::default() + }), + }, + // Allow traffic from orchestratord to environmentd in order to hit + // the promotion endpoints during upgrades + NetworkPolicy { + metadata: environment + .managed_resource_meta(environment.name_prefixed("allow-orchestratord")), + spec: Some(NetworkPolicySpec { + ingress: Some(vec![NetworkPolicyIngressRule { + from: Some(vec![NetworkPolicyPeer { + namespace_selector: Some(LabelSelector { + match_labels: Some(btreemap! { + "kubernetes.io/metadata.name".into() + => self.config.orchestratord_namespace.clone(), + }), + ..Default::default() + }), + pod_selector: Some(orchestratord_label_selector), + ..Default::default() + }]), + ports: Some(vec![ + NetworkPolicyPort { + port: Some(IntOrString::Int( + self.config.environmentd_http_port.into(), + )), + protocol: Some("TCP".to_string()), + ..Default::default() + }, + NetworkPolicyPort { + port: Some(IntOrString::Int( + self.config.environmentd_internal_http_port.into(), + )), + protocol: Some("TCP".to_string()), + ..Default::default() + }, + ]), + ..Default::default() + }]), + pod_selector: Some(environmentd_label_selector), + policy_types: Some(vec!["Ingress".to_owned()]), + ..Default::default() + }), + }, + ]); + } + if self.config.network_policies_ingress_enabled { + let mut ingress_label_selector = environment.default_labels(); + ingress_label_selector.insert( + "materialize.cloud/app".to_owned(), + environment.balancerd_app_name(), + ); + network_policies.extend([NetworkPolicy { + metadata: environment + .managed_resource_meta(environment.name_prefixed("sql-and-http-ingress")), + spec: Some(NetworkPolicySpec { + ingress: Some(vec![NetworkPolicyIngressRule { + from: Some( + self.config + .network_policies_ingress_cidrs + .iter() + .map(|cidr| NetworkPolicyPeer { + ip_block: Some(IPBlock { + cidr: cidr.to_owned(), + except: None, + }), + ..Default::default() + }) + .collect(), + ), + ports: Some(vec![ + NetworkPolicyPort { + port: Some(IntOrString::Int( + self.config.environmentd_http_port.into(), + )), + protocol: Some("TCP".to_string()), + ..Default::default() + }, + NetworkPolicyPort { + port: Some(IntOrString::Int( + self.config.environmentd_sql_port.into(), + )), + protocol: Some("TCP".to_string()), + ..Default::default() + }, + ]), + ..Default::default() + }]), + pod_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(ingress_label_selector), + }), + policy_types: Some(vec!["Ingress".to_owned()]), + ..Default::default() + }), + }]); + } + if self.config.network_policies_egress_enabled { + network_policies.extend([NetworkPolicy { + metadata: environment + .managed_resource_meta(environment.name_prefixed("sources-and-sinks-egress")), + spec: Some(NetworkPolicySpec { + egress: Some(vec![NetworkPolicyEgressRule { + to: Some( + self.config + .network_policies_egress_cidrs + .iter() + .map(|cidr| NetworkPolicyPeer { + ip_block: Some(IPBlock { + cidr: cidr.to_owned(), + except: None, + }), + ..Default::default() + }) + .collect(), + ), + ..Default::default() + }]), + pod_selector: Some(LabelSelector { + match_expressions: None, + match_labels: Some(environment.default_labels()), + }), + policy_types: Some(vec!["Egress".to_owned()]), + ..Default::default() + }), + }]); + } + network_policies + } + + fn create_service_account(&self, environment: &Environment) -> Option { + if environment.create_service_account() { + let mut annotations: BTreeMap = environment + .spec + .service_account_annotations + .clone() + .unwrap_or_default(); + if let (CloudProvider::Aws, Some(role_arn)) = ( + self.config.cloud_provider, + environment + .spec + .environmentd_iam_role_arn + .as_deref() + .or(self.config.environmentd_iam_role_arn.as_deref()), + ) { + warn!( + "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead." + ); + annotations.insert( + "eks.amazonaws.com/role-arn".to_string(), + role_arn.to_string(), + ); + }; + + let mut labels = environment.default_labels(); + labels.extend( + environment + .spec + .service_account_labels + .clone() + .unwrap_or_default(), + ); + + Some(ServiceAccount { + metadata: ObjectMeta { + annotations: Some(annotations), + labels: Some(labels), + ..environment.managed_resource_meta(environment.service_account_name()) + }, + ..Default::default() + }) + } else { + None + } + } + + fn create_role(&self, environment: &Environment) -> Role { + Role { + metadata: environment.managed_resource_meta(environment.role_name()), + rules: Some(vec![ + PolicyRule { + api_groups: Some(vec!["apps".to_string()]), + resources: Some(vec!["statefulsets".to_string()]), + verbs: vec![ + "get".to_string(), + "list".to_string(), + "watch".to_string(), + "create".to_string(), + "update".to_string(), + "patch".to_string(), + "delete".to_string(), + ], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["".to_string()]), + resources: Some(vec![ + "persistentvolumeclaims".to_string(), + "pods".to_string(), + "secrets".to_string(), + "services".to_string(), + ]), + verbs: vec![ + "get".to_string(), + "list".to_string(), + "watch".to_string(), + "create".to_string(), + "update".to_string(), + "patch".to_string(), + "delete".to_string(), + ], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["".to_string()]), + resources: Some(vec!["configmaps".to_string()]), + verbs: vec!["get".to_string()], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["materialize.cloud".to_string()]), + resources: Some(vec!["vpcendpoints".to_string()]), + verbs: vec![ + "get".to_string(), + "list".to_string(), + "watch".to_string(), + "create".to_string(), + "update".to_string(), + "patch".to_string(), + "delete".to_string(), + ], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["metrics.k8s.io".to_string()]), + resources: Some(vec!["pods".to_string()]), + verbs: vec!["get".to_string(), "list".to_string()], + ..Default::default() + }, + PolicyRule { + api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]), + resources: Some(vec![ + "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(), + "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(), + ]), + verbs: vec!["get".to_string()], + ..Default::default() + }, + ]), + } + } + + fn create_role_binding(&self, environment: &Environment) -> RoleBinding { + RoleBinding { + metadata: environment.managed_resource_meta(environment.role_binding_name()), + role_ref: RoleRef { + api_group: "".to_string(), + kind: "Role".to_string(), + name: environment.role_name(), + }, + subjects: Some(vec![Subject { + api_group: Some("".to_string()), + kind: "ServiceAccount".to_string(), + name: environment.service_account_name(), + namespace: Some(environment.namespace()), + }]), + } + } + + fn create_certificate(&self, environment: &Environment) -> Option { + create_certificate( + self.config.default_certificate_specs.internal.clone(), + environment, + environment.spec.internal_certificate_spec.clone(), + environment.certificate_name(), + environment.certificate_secret_name(), + Some(vec![ + environment.service_name(), + environment.service_internal_fqdn(), + ]), + CertificatePrivateKeyAlgorithm::Ed25519, + None, + ) + } +} + +#[async_trait::async_trait] +impl k8s_controller::Context for Context { + type Resource = Environment; + type Error = Error; + + #[instrument(fields())] + async fn apply( + &self, + client: Client, + environment: &Self::Resource, + ) -> Result, Self::Error> { + if environment.status.is_none() { + let environment_api: Api = Api::namespaced( + client.clone(), + &environment.meta().namespace.clone().unwrap(), + ); + let mut new_environment = environment.clone(); + new_environment.status = Some(environment.status()); + environment_api + .replace_status( + &environment.name_unchecked(), + &PostParams::default(), + serde_json::to_vec(&new_environment).unwrap(), + ) + .await?; + // Updating the status should trigger a reconciliation + // which will include a status this time. + return Ok(None); + } + + let namespace = environment.namespace(); + let network_policy_api: Api = Api::namespaced(client.clone(), &namespace); + let service_account_api: Api = Api::namespaced(client.clone(), &namespace); + let role_api: Api = Api::namespaced(client.clone(), &namespace); + let role_binding_api: Api = Api::namespaced(client.clone(), &namespace); + let certificate_api: Api = Api::namespaced(client.clone(), &namespace); + + for policy in self.create_network_policies(environment) { + trace!("applying network policy {}", policy.name_unchecked()); + apply_resource(&network_policy_api, &policy).await?; + } + + if let Some(service_account) = self.create_service_account(environment) { + trace!("applying environmentd service account"); + apply_resource(&service_account_api, &service_account).await?; + } + + trace!("applying environmentd role"); + apply_resource(&role_api, &self.create_role(environment)).await?; + + trace!("applying environmentd role binding"); + apply_resource(&role_binding_api, &self.create_role_binding(environment)).await?; + + if let Some(certificate) = self.create_certificate(environment) { + trace!("creating new environmentd certificate"); + apply_resource(&certificate_api, &certificate).await?; + } + + Ok(None) + } +} diff --git a/src/orchestratord/src/k8s.rs b/src/orchestratord/src/k8s.rs index 2688fccb78284..bb25fbbc109b4 100644 --- a/src/orchestratord/src/k8s.rs +++ b/src/orchestratord/src/k8s.rs @@ -94,6 +94,10 @@ pub async fn register_crds( crds: vec![mz_crd], stored_version: String::from("v1alpha1"), }, + VersionedCrd { + crds: vec![crd::environment::v1alpha1::Environment::crd()], + stored_version: String::from("v1alpha1"), + }, VersionedCrd { crds: vec![crd::balancer::v1alpha1::Balancer::crd()], stored_version: String::from("v1alpha1"), From a69bd3495a255cdc4e16b2acdd01f567bc2fd1db Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Wed, 17 Dec 2025 17:58:25 -0500 Subject: [PATCH 2/2] remove global environment resource creation from materialize operator --- src/orchestratord/src/bin/orchestratord.rs | 10 - .../src/controller/materialize.rs | 24 +- .../controller/materialize/environmentd.rs | 402 +----------------- 3 files changed, 20 insertions(+), 416 deletions(-) diff --git a/src/orchestratord/src/bin/orchestratord.rs b/src/orchestratord/src/bin/orchestratord.rs index 0ed8cb5761533..2747771f0adbc 100644 --- a/src/orchestratord/src/bin/orchestratord.rs +++ b/src/orchestratord/src/bin/orchestratord.rs @@ -322,7 +322,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { console_image_tag_default: args.console_image_tag_default, console_image_tag_map: args.console_image_tag_map, aws_account_id: args.aws_account_id, - environmentd_iam_role_arn: args.environmentd_iam_role_arn.clone(), environmentd_connection_role_arn: args.environmentd_connection_role_arn, aws_secrets_controller_tags: args.aws_secrets_controller_tags, environmentd_availability_zones: args.environmentd_availability_zones, @@ -331,9 +330,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { enable_security_context: args.enable_security_context, enable_internal_statement_logging: args.enable_internal_statement_logging, disable_statement_logging: args.disable_statement_logging, - orchestratord_pod_selector_labels: args - .orchestratord_pod_selector_labels - .clone(), environmentd_node_selector: args.environmentd_node_selector, environmentd_affinity: args.environmentd_affinity, environmentd_tolerations: args.environmentd_tolerations, @@ -342,11 +338,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { clusterd_affinity: args.clusterd_affinity, clusterd_tolerations: args.clusterd_tolerations, image_pull_policy: args.image_pull_policy, - network_policies_internal_enabled: args.network_policies_internal_enabled, - network_policies_ingress_enabled: args.network_policies_ingress_enabled, - network_policies_ingress_cidrs: args.network_policies_ingress_cidrs.clone(), - network_policies_egress_enabled: args.network_policies_egress_enabled, - network_policies_egress_cidrs: args.network_policies_egress_cidrs.clone(), environmentd_cluster_replica_sizes: args.environmentd_cluster_replica_sizes, bootstrap_default_cluster_replica_size: args .bootstrap_default_cluster_replica_size, @@ -379,7 +370,6 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { default_certificate_specs: args.default_certificate_specs.clone(), disable_license_key_checks: args.disable_license_key_checks, tracing: args.tracing, - orchestratord_namespace: namespace.clone(), }, Arc::clone(&metrics), client.clone(), diff --git a/src/orchestratord/src/controller/materialize.rs b/src/orchestratord/src/controller/materialize.rs index 224cdc0efdfaf..796a2806aeb72 100644 --- a/src/orchestratord/src/controller/materialize.rs +++ b/src/orchestratord/src/controller/materialize.rs @@ -40,6 +40,7 @@ use mz_cloud_resources::crd::{ ManagedResource, balancer::v1alpha1::{Balancer, BalancerSpec}, console::v1alpha1::{BalancerdRef, Console, ConsoleSpec, HttpConnectionScheme}, + environment::v1alpha1::{Environment, EnvironmentSpec}, materialize::v1alpha1::{Materialize, MaterializeRolloutStrategy, MaterializeStatus}, }; use mz_license_keys::validate; @@ -66,7 +67,6 @@ pub struct Config { pub console_image_tag_map: Vec>, pub aws_account_id: Option, - pub environmentd_iam_role_arn: Option, pub environmentd_connection_role_arn: Option, pub aws_secrets_controller_tags: Vec, pub environmentd_availability_zones: Option>, @@ -77,7 +77,6 @@ pub struct Config { pub enable_internal_statement_logging: bool, pub disable_statement_logging: bool, - pub orchestratord_pod_selector_labels: Vec>, pub environmentd_node_selector: Vec>, pub environmentd_affinity: Option, pub environmentd_tolerations: Option>, @@ -86,11 +85,6 @@ pub struct Config { pub clusterd_affinity: Option, pub clusterd_tolerations: Option>, pub image_pull_policy: KubernetesImagePullPolicy, - pub network_policies_internal_enabled: bool, - pub network_policies_ingress_enabled: bool, - pub network_policies_ingress_cidrs: Vec, - pub network_policies_egress_enabled: bool, - pub network_policies_egress_cidrs: Vec, pub environmentd_cluster_replica_sizes: Option, pub bootstrap_default_cluster_replica_size: Option, @@ -118,7 +112,6 @@ pub struct Config { pub disable_license_key_checks: bool, pub tracing: TracingCliArgs, - pub orchestratord_namespace: String, } pub struct Context { @@ -272,6 +265,7 @@ impl k8s_controller::Context for Context { mz: &Self::Resource, ) -> Result, Self::Error> { let mz_api: Api = Api::namespaced(client.clone(), &mz.namespace()); + let environment_api: Api = Api::namespaced(client.clone(), &mz.namespace()); let balancer_api: Api = Api::namespaced(client.clone(), &mz.namespace()); let console_api: Api = Api::namespaced(client.clone(), &mz.namespace()); let secret_api: Api = Api::namespaced(client.clone(), &mz.namespace()); @@ -360,6 +354,20 @@ impl k8s_controller::Context for Context { self.check_environment_id_conflicts(mz)?; + let environment = Environment { + metadata: mz.managed_resource_meta(mz.name_unchecked()), + spec: EnvironmentSpec { + environmentd_iam_role_arn: mz.spec.environmentd_iam_role_arn.clone(), + service_account_name: mz.spec.service_account_name.clone(), + service_account_annotations: mz.spec.service_account_annotations.clone(), + service_account_labels: mz.spec.service_account_labels.clone(), + internal_certificate_spec: mz.spec.internal_certificate_spec.clone(), + resource_id: Some(status.resource_id.clone()), + }, + status: None, + }; + apply_resource(&environment_api, &environment).await?; + // we compare the hash against the environment resources generated // for the current active generation, since that's what we expect to // have been applied earlier, but we don't want to use these diff --git a/src/orchestratord/src/controller/materialize/environmentd.rs b/src/orchestratord/src/controller/materialize/environmentd.rs index af82e11bca95f..d0958ee247319 100644 --- a/src/orchestratord/src/controller/materialize/environmentd.rs +++ b/src/orchestratord/src/controller/materialize/environmentd.rs @@ -21,14 +21,8 @@ use k8s_openapi::{ Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar, EnvVarSource, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe, SeccompProfile, Secret, SecretKeySelector, SecretVolumeSource, SecurityContext, - Service, ServiceAccount, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, - VolumeMount, + Service, ServicePort, ServiceSpec, TCPSocketAction, Toleration, Volume, VolumeMount, }, - networking::v1::{ - IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule, - NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec, - }, - rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject}, }, apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString}, }; @@ -42,18 +36,14 @@ use reqwest::{Client as HttpClient, StatusCode}; use semver::{BuildMetadata, Prerelease, Version}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -use tracing::{error, trace, warn}; +use tracing::{error, trace}; use super::Error; use super::matching_image_from_environmentd_image_ref; use crate::k8s::{apply_resource, delete_resource, get_resource}; -use crate::tls::{create_certificate, issuer_ref_defined}; +use crate::tls::issuer_ref_defined; use mz_cloud_provider::CloudProvider; -use mz_cloud_resources::crd::materialize::v1alpha1::Materialize; -use mz_cloud_resources::crd::{ - ManagedResource, - generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm}, -}; +use mz_cloud_resources::crd::{ManagedResource, materialize::v1alpha1::Materialize}; use mz_ore::instrument; static V140_DEV0: LazyLock = LazyLock::new(|| Version { @@ -129,30 +119,19 @@ pub struct ConnectionInfo { #[derive(Debug, Serialize)] pub struct Resources { pub generation: u64, - pub environmentd_network_policies: Vec, - pub service_account: Box>, - pub role: Box, - pub role_binding: Box, pub public_service: Box, pub generation_service: Box, pub persist_pubsub_service: Box, - pub environmentd_certificate: Box>, pub environmentd_statefulset: Box, pub connection_info: Box, } impl Resources { pub fn new(config: &super::Config, mz: &Materialize, generation: u64) -> Self { - let environmentd_network_policies = create_environmentd_network_policies(config, mz); - - let service_account = Box::new(create_service_account_object(config, mz)); - let role = Box::new(create_role_object(mz)); - let role_binding = Box::new(create_role_binding_object(mz)); let public_service = Box::new(create_public_service_object(config, mz, generation)); let generation_service = Box::new(create_generation_service_object(config, mz, generation)); let persist_pubsub_service = Box::new(create_persist_pubsub_service(config, mz, generation)); - let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz)); let environmentd_statefulset = Box::new(create_environmentd_statefulset_object( config, mz, generation, )); @@ -160,14 +139,9 @@ impl Resources { Self { generation, - environmentd_network_policies, - service_account, - role, - role_binding, public_service, generation_service, persist_pubsub_service, - environmentd_certificate, environmentd_statefulset, connection_info, } @@ -180,43 +154,16 @@ impl Resources { force_promote: bool, namespace: &str, ) -> Result, Error> { - let environmentd_network_policy_api: Api = - Api::namespaced(client.clone(), namespace); let service_api: Api = Api::namespaced(client.clone(), namespace); - let service_account_api: Api = Api::namespaced(client.clone(), namespace); - let role_api: Api = Api::namespaced(client.clone(), namespace); - let role_binding_api: Api = Api::namespaced(client.clone(), namespace); let statefulset_api: Api = Api::namespaced(client.clone(), namespace); - let certificate_api: Api = Api::namespaced(client.clone(), namespace); let configmap_api: Api = Api::namespaced(client.clone(), namespace); - for policy in &self.environmentd_network_policies { - trace!("applying network policy {}", policy.name_unchecked()); - apply_resource(&environmentd_network_policy_api, policy).await?; - } - - if let Some(service_account) = &*self.service_account { - trace!("applying environmentd service account"); - apply_resource(&service_account_api, service_account).await?; - } - - trace!("applying environmentd role"); - apply_resource(&role_api, &*self.role).await?; - - trace!("applying environmentd role binding"); - apply_resource(&role_binding_api, &*self.role_binding).await?; - trace!("applying environmentd per-generation service"); apply_resource(&service_api, &*self.generation_service).await?; trace!("creating persist pubsub service"); apply_resource(&service_api, &*self.persist_pubsub_service).await?; - if let Some(certificate) = &*self.environmentd_certificate { - trace!("creating new environmentd certificate"); - apply_resource(&certificate_api, certificate).await?; - } - trace!("applying listeners configmap"); apply_resource(&configmap_api, &self.connection_info.listeners_configmap).await?; @@ -471,328 +418,6 @@ impl Resources { } } -fn create_environmentd_network_policies( - config: &super::Config, - mz: &Materialize, -) -> Vec { - let mut network_policies = Vec::new(); - if config.network_policies_internal_enabled { - let environmentd_label_selector = LabelSelector { - match_labels: Some( - mz.default_labels() - .into_iter() - .chain([( - "materialize.cloud/app".to_owned(), - mz.environmentd_app_name(), - )]) - .collect(), - ), - ..Default::default() - }; - let orchestratord_label_selector = LabelSelector { - match_labels: Some( - config - .orchestratord_pod_selector_labels - .iter() - .cloned() - .map(|kv| (kv.key, kv.value)) - .collect(), - ), - ..Default::default() - }; - // TODO (Alex) filter to just clusterd and environmentd, - // once we get a consistent set of labels for both. - let all_pods_label_selector = LabelSelector { - // TODO: can't use default_labels() here because it needs to be - // consistent between balancer and materialize resources, and - // materialize resources have additional labels - we should - // figure out something better here (probably balancers should - // install their own network policies) - match_labels: Some( - [( - "materialize.cloud/mz-resource-id".to_owned(), - mz.resource_id().to_owned(), - )] - .into(), - ), - ..Default::default() - }; - network_policies.extend([ - // Allow all clusterd/environmentd traffic (between pods in the - // same environment) - NetworkPolicy { - metadata: mz - .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")), - spec: Some(NetworkPolicySpec { - egress: Some(vec![NetworkPolicyEgressRule { - to: Some(vec![NetworkPolicyPeer { - pod_selector: Some(all_pods_label_selector.clone()), - ..Default::default() - }]), - ..Default::default() - }]), - ingress: Some(vec![NetworkPolicyIngressRule { - from: Some(vec![NetworkPolicyPeer { - pod_selector: Some(all_pods_label_selector.clone()), - ..Default::default() - }]), - ..Default::default() - }]), - pod_selector: Some(all_pods_label_selector.clone()), - policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]), - ..Default::default() - }), - }, - // Allow traffic from orchestratord to environmentd in order to hit - // the promotion endpoints during upgrades - NetworkPolicy { - metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")), - spec: Some(NetworkPolicySpec { - ingress: Some(vec![NetworkPolicyIngressRule { - from: Some(vec![NetworkPolicyPeer { - namespace_selector: Some(LabelSelector { - match_labels: Some(btreemap! { - "kubernetes.io/metadata.name".into() - => config.orchestratord_namespace.clone(), - }), - ..Default::default() - }), - pod_selector: Some(orchestratord_label_selector), - ..Default::default() - }]), - ports: Some(vec![ - NetworkPolicyPort { - port: Some(IntOrString::Int(config.environmentd_http_port.into())), - protocol: Some("TCP".to_string()), - ..Default::default() - }, - NetworkPolicyPort { - port: Some(IntOrString::Int( - config.environmentd_internal_http_port.into(), - )), - protocol: Some("TCP".to_string()), - ..Default::default() - }, - ]), - ..Default::default() - }]), - pod_selector: Some(environmentd_label_selector), - policy_types: Some(vec!["Ingress".to_owned()]), - ..Default::default() - }), - }, - ]); - } - if config.network_policies_ingress_enabled { - let mut ingress_label_selector = mz.default_labels(); - ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name()); - network_policies.extend([NetworkPolicy { - metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")), - spec: Some(NetworkPolicySpec { - ingress: Some(vec![NetworkPolicyIngressRule { - from: Some( - config - .network_policies_ingress_cidrs - .iter() - .map(|cidr| NetworkPolicyPeer { - ip_block: Some(IPBlock { - cidr: cidr.to_owned(), - except: None, - }), - ..Default::default() - }) - .collect(), - ), - ports: Some(vec![ - NetworkPolicyPort { - port: Some(IntOrString::Int(config.environmentd_http_port.into())), - protocol: Some("TCP".to_string()), - ..Default::default() - }, - NetworkPolicyPort { - port: Some(IntOrString::Int(config.environmentd_sql_port.into())), - protocol: Some("TCP".to_string()), - ..Default::default() - }, - ]), - ..Default::default() - }]), - pod_selector: Some(LabelSelector { - match_expressions: None, - match_labels: Some(ingress_label_selector), - }), - policy_types: Some(vec!["Ingress".to_owned()]), - ..Default::default() - }), - }]); - } - if config.network_policies_egress_enabled { - network_policies.extend([NetworkPolicy { - metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")), - spec: Some(NetworkPolicySpec { - egress: Some(vec![NetworkPolicyEgressRule { - to: Some( - config - .network_policies_egress_cidrs - .iter() - .map(|cidr| NetworkPolicyPeer { - ip_block: Some(IPBlock { - cidr: cidr.to_owned(), - except: None, - }), - ..Default::default() - }) - .collect(), - ), - ..Default::default() - }]), - pod_selector: Some(LabelSelector { - match_expressions: None, - match_labels: Some(mz.default_labels()), - }), - policy_types: Some(vec!["Egress".to_owned()]), - ..Default::default() - }), - }]); - } - network_policies -} - -fn create_service_account_object( - config: &super::Config, - mz: &Materialize, -) -> Option { - if mz.create_service_account() { - let mut annotations: BTreeMap = mz - .spec - .service_account_annotations - .clone() - .unwrap_or_default(); - if let (CloudProvider::Aws, Some(role_arn)) = ( - config.cloud_provider, - mz.spec - .environmentd_iam_role_arn - .as_deref() - .or(config.environmentd_iam_role_arn.as_deref()), - ) { - warn!( - "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead." - ); - annotations.insert( - "eks.amazonaws.com/role-arn".to_string(), - role_arn.to_string(), - ); - }; - - let mut labels = mz.default_labels(); - labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default()); - - Some(ServiceAccount { - metadata: ObjectMeta { - annotations: Some(annotations), - labels: Some(labels), - ..mz.managed_resource_meta(mz.service_account_name()) - }, - ..Default::default() - }) - } else { - None - } -} - -fn create_role_object(mz: &Materialize) -> Role { - Role { - metadata: mz.managed_resource_meta(mz.role_name()), - rules: Some(vec![ - PolicyRule { - api_groups: Some(vec!["apps".to_string()]), - resources: Some(vec!["statefulsets".to_string()]), - verbs: vec![ - "get".to_string(), - "list".to_string(), - "watch".to_string(), - "create".to_string(), - "update".to_string(), - "patch".to_string(), - "delete".to_string(), - ], - ..Default::default() - }, - PolicyRule { - api_groups: Some(vec!["".to_string()]), - resources: Some(vec![ - "persistentvolumeclaims".to_string(), - "pods".to_string(), - "secrets".to_string(), - "services".to_string(), - ]), - verbs: vec![ - "get".to_string(), - "list".to_string(), - "watch".to_string(), - "create".to_string(), - "update".to_string(), - "patch".to_string(), - "delete".to_string(), - ], - ..Default::default() - }, - PolicyRule { - api_groups: Some(vec!["".to_string()]), - resources: Some(vec!["configmaps".to_string()]), - verbs: vec!["get".to_string()], - ..Default::default() - }, - PolicyRule { - api_groups: Some(vec!["materialize.cloud".to_string()]), - resources: Some(vec!["vpcendpoints".to_string()]), - verbs: vec![ - "get".to_string(), - "list".to_string(), - "watch".to_string(), - "create".to_string(), - "update".to_string(), - "patch".to_string(), - "delete".to_string(), - ], - ..Default::default() - }, - PolicyRule { - api_groups: Some(vec!["metrics.k8s.io".to_string()]), - resources: Some(vec!["pods".to_string()]), - verbs: vec!["get".to_string(), "list".to_string()], - ..Default::default() - }, - PolicyRule { - api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]), - resources: Some(vec![ - "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(), - "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(), - ]), - verbs: vec!["get".to_string()], - ..Default::default() - }, - ]), - } -} - -fn create_role_binding_object(mz: &Materialize) -> RoleBinding { - RoleBinding { - metadata: mz.managed_resource_meta(mz.role_binding_name()), - role_ref: RoleRef { - api_group: "".to_string(), - kind: "Role".to_string(), - name: mz.role_name(), - }, - subjects: Some(vec![Subject { - api_group: Some("".to_string()), - kind: "ServiceAccount".to_string(), - name: mz.service_account_name(), - namespace: Some(mz.namespace()), - }]), - } -} - fn create_public_service_object( config: &super::Config, mz: &Materialize, @@ -889,25 +514,6 @@ fn create_persist_pubsub_service( } } -fn create_environmentd_certificate( - config: &super::Config, - mz: &Materialize, -) -> Option { - create_certificate( - config.default_certificate_specs.internal.clone(), - mz, - mz.spec.internal_certificate_spec.clone(), - mz.environmentd_certificate_name(), - mz.environmentd_certificate_secret_name(), - Some(vec![ - mz.environmentd_service_name(), - mz.environmentd_service_internal_fqdn(), - ]), - CertificatePrivateKeyAlgorithm::Ed25519, - None, - ) -} - fn create_environmentd_statefulset_object( config: &super::Config, mz: &Materialize,