Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f939934
feat: Support objectOverrides
sbernauer Dec 23, 2025
d6b4c95
update to newer op-rs
sbernauer Dec 23, 2025
49848cf
changelog
sbernauer Dec 23, 2025
fe99463
Adding support for multiple git-sync repos
Maleware Jan 4, 2026
65e39a9
Merge branch 'main' into feat/multi-git-sync
Maleware Jan 4, 2026
2cbae70
Cleaning up
Maleware Jan 4, 2026
3ea363a
Merge branch 'feat/multi-git-sync' of github.com:stackabletech/airflo…
Maleware Jan 4, 2026
26c11a7
makeing pre-commit happy
Maleware Jan 7, 2026
7c7f25f
Adding check if cm is mounted into core folder, if omit mkdir
Maleware Jan 27, 2026
ffa249a
store wip
Maleware Feb 13, 2026
984d7c1
working example, clean up
Maleware Feb 24, 2026
5021b33
Merge branch 'main' into feat/multi-git-sync
Maleware Feb 24, 2026
6538c67
update to v1alpha2
Maleware Feb 24, 2026
0e0b09f
Add CHANGELOG.md
Maleware Feb 24, 2026
04ec315
Adding docs draft
Maleware Feb 24, 2026
3e2c438
Better documentation
Maleware Feb 24, 2026
010bfd3
Refactor strings into constants
Maleware Feb 24, 2026
50427cb
Further removing strings and consolidate in constant
Maleware Feb 24, 2026
a54f9e5
Improve comments and refactor
Maleware Feb 25, 2026
8b2f4b6
Add two more comments
Maleware Feb 25, 2026
61dc8ce
Another comment
Maleware Feb 25, 2026
7e80e9a
removing add_env for dags-init container
Maleware Feb 25, 2026
d9558ea
consistent names across init containers
Maleware Feb 25, 2026
c5befaf
add only if git-sync is not empty in pythonpath
Maleware Feb 27, 2026
c2b6e44
Updating integration test mount-dags-gitsync to use two git-syncs
Maleware Feb 27, 2026
cf8b245
making pre-commit happy again
Maleware Feb 27, 2026
1c188c5
again, pre-commit for dag-metrics.py
Maleware Feb 27, 2026
db8ecbd
rework typos
Maleware Feb 27, 2026
8db36d6
better documentation for multi-git-sync
Maleware Feb 27, 2026
c7052a8
Merge branch 'main' into feat/multi-git-sync
Maleware Feb 27, 2026
1b43f0d
again pre-commit hook was unhappy
Maleware Feb 27, 2026
bd8d7a4
Merge branch 'feat/multi-git-sync' of github.com:stackabletech/airflo…
Maleware Feb 27, 2026
ff8bb20
Add test print I forgot
Maleware Feb 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added

- Add git-sync support for multiple repositories ([#729]).
- Add support for airflow 3.1.6 ([#742]).
- Add operator versioning ([#725]).
- GitSync considered for v1alpha1 and v1alpha2
Expand All @@ -21,6 +22,7 @@
- Prevent unnecessary Pod restarts when initially creating an AirflowCluster.
This is achieved by applying the StatefulSet after all ConfigMaps and Secrets that it mounts ([#734]).

[#729]: https://github.com/stackabletech/airflow-operator/pull/729
[#725]: https://github.com/stackabletech/airflow-operator/pull/725
[#726]: https://github.com/stackabletech/airflow-operator/pull/726
[#727]: https://github.com/stackabletech/airflow-operator/pull/727
Expand Down
23 changes: 22 additions & 1 deletion docs/modules/airflow/pages/usage-guide/mounting-dags.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ For multiple DAGs, it is easier to expose them via `gitsync`, as shown below.

== Via `git-sync`

{git-sync}[git-sync] is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes.
{git-sync}[git-sync] is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes. {git-sync}[git-sync] folders will be provided at `/stackable/app/allDAGs/current-i`, with i in {0,1,..,n-1}.
The Stackable Airflow images already ship with git-sync included, and the operator takes care of calling the tool and mounting volumes, so that only the repository and synchronization details are required:

.git-sync usage example: https
Expand Down Expand Up @@ -82,3 +82,24 @@ include::example$example-airflow-gitsync-ssh.yaml[]

NOTE: git-sync can be used with DAGs that make use of Python modules, as Python is configured to use the git-sync target folder as the "root" location when looking for referenced files.
See the xref:usage-guide/applying-custom-resources.adoc[] example for more details.

=== Multiple repositories via git-sync

If you want to access multiple branches of a repository or load dags from multiple repositories, you can extend the list shown above.

The default mount-path for git-sync resources is `/stackable/app/allDAGs/current-{i}` where i corresponds to:

[source,yaml]
----
dagsGitSync:
- repo: ssh://git@github.com/stackable-airflow/dags.git # <-- current-0
branch: test/git-sync-0
credentials:
sshPrivateKeySecretName: git-sync-ssh
- repo: ssh://git@github.com/stackable-airflow/dags.git # <-- current-1
branch: test/git-sync-1
credentials:
sshPrivateKeySecretName: git-sync-ssh
----

NOTE: Using DAG's from ConfigMaps will need you to either use celeryExecutors and mount them under `/stackable/app/allDAGs/<name>` or use kubernetesExecutor and mount them somewhere else and change the PYTHONPATH as well as the `AIRFLOW\__CORE__DAGS_FOLDER` accordingly.
44 changes: 39 additions & 5 deletions rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ use crate::{
config::{self, PYTHON_IMPORTS},
controller_commons::{self, CONFIG_VOLUME_NAME, LOG_CONFIG_VOLUME_NAME, LOG_VOLUME_NAME},
crd::{
self, AIRFLOW_CONFIG_FILENAME, APP_NAME, AirflowClusterStatus, AirflowConfig,
AirflowConfigOptions, AirflowExecutor, AirflowRole, CONFIG_PATH, Container, ExecutorConfig,
ExecutorConfigFragment, HTTP_PORT, HTTP_PORT_NAME, LISTENER_VOLUME_DIR,
self, AIRFLOW_CONFIG_FILENAME, AIRFLOW_DAGS_FOLDER, APP_NAME, AirflowClusterStatus,
AirflowConfig, AirflowConfigOptions, AirflowExecutor, AirflowRole, CONFIG_PATH, Container,
ExecutorConfig, ExecutorConfigFragment, HTTP_PORT, HTTP_PORT_NAME, LISTENER_VOLUME_DIR,
LISTENER_VOLUME_NAME, LOG_CONFIG_DIR, METRICS_PORT, METRICS_PORT_NAME, OPERATOR_NAME,
STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME, TEMPLATE_VOLUME_NAME,
authentication::{
Expand Down Expand Up @@ -113,6 +113,7 @@ use crate::{
},
};

pub const AIRFLOW_DAGS_INIT_VOLUME: &str = "all-dags-volume";
pub const AIRFLOW_CONTROLLER_NAME: &str = "airflowcluster";
pub const DOCKER_IMAGE_BASE_NAME: &str = "airflow";
pub const AIRFLOW_FULL_CONTROLLER_NAME: &str =
Expand Down Expand Up @@ -1019,7 +1020,6 @@ fn build_server_rolegroup_statefulset(
executor,
authentication_config,
authorization_config,
git_sync_resources,
resolved_product_image,
)
.context(BuildStatefulsetEnvVarsSnafu)?,
Expand Down Expand Up @@ -1299,7 +1299,6 @@ fn build_executor_template_config_map(
airflow,
env_overrides,
merged_executor_config,
git_sync_resources,
resolved_product_image,
))
.add_volume_mounts(airflow.volume_mounts())
Expand All @@ -1319,6 +1318,41 @@ fn build_executor_template_config_map(
true,
)?;

/*
Multi-gitsync requires an init-container which copies contents from
gitsync volumes into one empty volume to be mounted into
the airflow image.
*/
let mut dags_init_container =
ContainerBuilder::new("multi-git-sync-init").context(InvalidContainerNameSnafu)?;
dags_init_container
.image_from_product_image(resolved_product_image)
.args(airflow.get_kubernetes_executer_multi_gitsync_commands())
.command(vec![
"/bin/bash".to_string(),
"-x".to_string(),
"-euo".to_string(),
"pipefail".to_string(),
"-c".to_string(),
])
.add_volume_mounts(git_sync_resources.git_content_volume_mounts.clone())
.context(AddVolumeMountSnafu)?
.add_volume_mount(
AIRFLOW_DAGS_INIT_VOLUME.to_owned(),
AIRFLOW_DAGS_FOLDER.to_owned(),
)
.context(AddVolumeMountSnafu)?;

airflow_container
.add_volume_mount(
AIRFLOW_DAGS_INIT_VOLUME.to_owned(),
AIRFLOW_DAGS_FOLDER.to_owned(),
)
.context(AddVolumeMountSnafu)?;

pb.add_init_container(dags_init_container.build());
pb.add_volume(VolumeBuilder::new(AIRFLOW_DAGS_INIT_VOLUME.to_owned()).build())
.context(AddVolumeSnafu)?;
pb.add_container(airflow_container.build());
pb.add_volumes(airflow.volumes().clone())
.context(AddVolumeSnafu)?;
Expand Down
40 changes: 40 additions & 0 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub const STACKABLE_LOG_DIR: &str = "/stackable/log";
pub const LOG_CONFIG_DIR: &str = "/stackable/app/log_config";
pub const AIRFLOW_HOME: &str = "/stackable/airflow";
pub const AIRFLOW_CONFIG_FILENAME: &str = "webserver_config.py";
pub const AIRFLOW_DAGS_FOLDER: &str = "/stackable/app/allDAGs";

pub const TEMPLATE_VOLUME_NAME: &str = "airflow-executor-pod-template";
pub const TEMPLATE_LOCATION: &str = "/templates";
Expand Down Expand Up @@ -439,6 +440,41 @@ impl v1alpha2::AirflowCluster {
fragment::validate(conf_rolegroup).context(FragmentValidationFailureSnafu)
}

// Softlink from each single folder git-{i} into {AIRFLOW_DAGS_FOLDER}/.
pub fn get_multi_gitsync_commands(&self) -> Vec<String> {
let mut symlinks = Vec::<String>::new();
for (i, _) in self.spec.cluster_config.dags_git_sync.iter().enumerate() {
symlinks.push(
format!("ln -s /stackable/app/git-{i}/current {AIRFLOW_DAGS_FOLDER}/current-{i}")
.to_string(),
)
}
symlinks
}

// kubernetesExecuter needs copy since it needs init-container.
pub fn get_kubernetes_executer_multi_gitsync_commands(&self) -> Vec<String> {
let mut cp_commands = Vec::<String>::new();
for (i, _) in self.spec.cluster_config.dags_git_sync.iter().enumerate() {
cp_commands.push(
format!("cp -r /stackable/app/git-{i}/current/ {AIRFLOW_DAGS_FOLDER}/current-{i}")
.to_string(),
);
}
// init-container seems to only accept one command line.
vec![cp_commands.join(" && ")]
}

// PYTHONPATH contains folder-name provided in CRD.
pub fn get_gitsync_absolute_paths(&self) -> Vec<String> {
let mut python_path = Vec::<String>::new();
for (i, git_sync) in self.spec.cluster_config.dags_git_sync.iter().enumerate() {
let folder = &git_sync.git_folder.display();
python_path.push(format!("{AIRFLOW_DAGS_FOLDER}/current-{i}/{folder}").to_string())
}
python_path
}

/// Retrieve and merge resource configs for the executor template
pub fn merged_executor_config(
&self,
Expand Down Expand Up @@ -600,10 +636,13 @@ impl AirflowRole {
format!(
"cp -RL {CONFIG_PATH}/{AIRFLOW_CONFIG_FILENAME} {AIRFLOW_HOME}/{AIRFLOW_CONFIG_FILENAME}"
),
// Adding cm as dags within the same AIRFLOW_DAGS_FOLDER may lead to problems, thus checking if exists.
format!("mkdir -p {AIRFLOW_DAGS_FOLDER}"),
// graceful shutdown part
COMMON_BASH_TRAP_FUNCTIONS.to_string(),
remove_vector_shutdown_file_command(STACKABLE_LOG_DIR),
];
command.extend(airflow.get_multi_gitsync_commands());

if resolved_product_image.product_version.starts_with("3.") {
// Start-up commands have changed in 3.x.
Expand Down Expand Up @@ -663,6 +702,7 @@ impl AirflowRole {
container_debug_command(),
"airflow triggerer &".to_string(),
]),
// This essentially doesn't work for KubernetesExecutor
AirflowRole::Worker => command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
Expand Down
54 changes: 19 additions & 35 deletions rust/operator-binary/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ use std::{
use product_config::types::PropertyNameKind;
use snafu::Snafu;
use stackable_operator::{
commons::product_image_selection::ResolvedProductImage,
crd::{authentication::oidc, git_sync},
k8s_openapi::api::core::v1::EnvVar,
kube::ResourceExt,
commons::product_image_selection::ResolvedProductImage, crd::authentication::oidc,
k8s_openapi::api::core::v1::EnvVar, kube::ResourceExt,
product_logging::framework::create_vector_shutdown_file_command,
};

use crate::{
crd::{
AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, STACKABLE_LOG_DIR,
TEMPLATE_LOCATION, TEMPLATE_NAME,
AIRFLOW_DAGS_FOLDER, AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR,
STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME,
authentication::{
AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved,
},
Expand Down Expand Up @@ -80,14 +78,13 @@ pub fn build_airflow_statefulset_envs(
executor: &AirflowExecutor,
auth_config: &AirflowClientAuthenticationDetailsResolved,
authorization_config: &AirflowAuthorizationResolved,
git_sync_resources: &git_sync::v1alpha2::GitSyncResources,
resolved_product_image: &ResolvedProductImage,
) -> Result<Vec<EnvVar>, Error> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();
let secret = airflow.spec.cluster_config.credentials_secret.as_str();
let internal_secret_name = airflow.shared_internal_secret_secret_name();

env.extend(static_envs(git_sync_resources));
env.extend(static_envs(airflow));

// environment variables
let env_vars = rolegroup_config.get(&PropertyNameKind::Env);
Expand Down Expand Up @@ -154,12 +151,11 @@ pub fn build_airflow_statefulset_envs(
);
}

let dags_folder = get_dags_folder(git_sync_resources);
env.insert(
AIRFLOW_CORE_DAGS_FOLDER.into(),
EnvVar {
name: AIRFLOW_CORE_DAGS_FOLDER.into(),
value: Some(dags_folder),
value: Some(AIRFLOW_DAGS_FOLDER.to_owned()),
..Default::default()
},
);
Expand Down Expand Up @@ -288,42 +284,32 @@ pub fn build_airflow_statefulset_envs(
Ok(transform_map_to_vec(env))
}

pub fn get_dags_folder(git_sync_resources: &git_sync::v1alpha2::GitSyncResources) -> String {
let git_sync_count = git_sync_resources.git_content_folders.len();
if git_sync_count > 1 {
tracing::warn!(
"There are {git_sync_count} git-sync entries: Only the first one will be considered.",
);
// contains absolute path to git-sync folder and log config.
fn construct_python_path(airflow: &v1alpha2::AirflowCluster) -> String {
let mut python_path = LOG_CONFIG_DIR.to_string();
let symlinks = airflow.get_gitsync_absolute_paths();
// append `:` only of there are git-sync entries.
if !symlinks.is_empty() {
python_path.push(':');
python_path.push_str(symlinks.join(":").as_str());
}

// If DAG provisioning via git-sync is not configured, set a default value
// so that PYTHONPATH can refer to it. N.B. nested variables need to be
// resolved, so that /stackable/airflow is used instead of $AIRFLOW_HOME.
// see https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dags-folder
git_sync_resources
.git_content_folders_as_string()
.first()
.cloned()
.unwrap_or("/stackable/airflow/dags".to_string())
python_path
}

// This set of environment variables is a standard set that is not dependent on any
// conditional logic and should be applied to the statefulset or the executor template config map.
fn static_envs(
git_sync_resources: &git_sync::v1alpha2::GitSyncResources,
) -> BTreeMap<String, EnvVar> {
fn static_envs(airflow: &v1alpha2::AirflowCluster) -> BTreeMap<String, EnvVar> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();

let dags_folder = get_dags_folder(git_sync_resources);

env.insert(
PYTHONPATH.into(),
EnvVar {
// PYTHONPATH must be extended to include the dags folder so that dag
// dependencies can be found: this must be the actual path and not a variable.
// Also include the airflow site-packages by default (for airflow and kubernetes classes etc.)
name: PYTHONPATH.into(),
value: Some(format!("{LOG_CONFIG_DIR}:{dags_folder}")),
value: Some(construct_python_path(airflow)),
..Default::default()
},
);
Expand Down Expand Up @@ -372,7 +358,6 @@ pub fn build_airflow_template_envs(
airflow: &v1alpha2::AirflowCluster,
env_overrides: &HashMap<String, String>,
config: &ExecutorConfig,
git_sync_resources: &git_sync::v1alpha2::GitSyncResources,
resolved_product_image: &ResolvedProductImage,
) -> Vec<EnvVar> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();
Expand Down Expand Up @@ -407,17 +392,16 @@ pub fn build_airflow_template_envs(

// the config map also requires the dag-folder location as this will be passed on
// to the pods started by airflow.
let dags_folder = get_dags_folder(git_sync_resources);
env.insert(
AIRFLOW_CORE_DAGS_FOLDER.into(),
EnvVar {
name: AIRFLOW_CORE_DAGS_FOLDER.into(),
value: Some(dags_folder),
value: Some(AIRFLOW_DAGS_FOLDER.to_owned()),
..Default::default()
},
);

env.extend(static_envs(git_sync_resources));
env.extend(static_envs(airflow));

add_version_specific_env_vars(
airflow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,26 @@ spec:
dagsGitSync:
{% if test_scenario['values']['access'] == 'ssh' %}
- repo: ssh://git@github.com/stackable-airflow/dags.git
branch: test/git-sync-0
credentials:
sshPrivateKeySecretName: git-sync-ssh
- repo: ssh://git@github.com/stackable-airflow/dags.git
branch: test/git-sync-1
credentials:
sshPrivateKeySecretName: git-sync-ssh
{% endif %}
{% if test_scenario['values']['access'] == 'https' %}
- repo: https://github.com/stackable-airflow/dags
branch: test/git-sync-0
credentials:
basicAuthSecretName: git-credentials
- repo: https://github.com/stackable-airflow/dags
branch: test/git-sync-1
credentials:
basicAuthSecretName: git-credentials
{% endif %}
{% if test_scenario['values']['executor'] == 'celery' %}
# Just setting some values to increase the test coverage (defaults should work just fine)
branch: main
wait: 5s
{% endif %}
gitSyncConf:
Expand Down
Loading
Loading