diff --git a/.ci/examples/py-examples/pulsarfunction-1.0.0-py3-none-any.whl b/.ci/examples/py-examples/pulsarfunction-1.0.0-py3-none-any.whl index a0e11d0ae..99a959e8a 100644 Binary files a/.ci/examples/py-examples/pulsarfunction-1.0.0-py3-none-any.whl and b/.ci/examples/py-examples/pulsarfunction-1.0.0-py3-none-any.whl differ diff --git a/.ci/tests/integration-oauth2/cases/java-download-function-with-package-service/manifests.yaml b/.ci/tests/integration-oauth2/cases/java-download-function-with-package-service/manifests.yaml new file mode 100644 index 000000000..c4d407e6e --- /dev/null +++ b/.ci/tests/integration-oauth2/cases/java-download-function-with-package-service/manifests.yaml @@ -0,0 +1,92 @@ +apiVersion: compute.functionmesh.io/v1alpha1 +kind: Function +metadata: + name: function-download-sample-package-service + namespace: default +spec: + image: streamnative/pulsar-functions-pulsarctl-java-runner:3.2.2.1 + className: org.apache.pulsar.functions.api.examples.ExclamationFunction + forwardSourceMessageProperty: true + maxPendingAsyncRequests: 1000 + replicas: 1 + logTopic: persistent://public/default/logging-function-logs-package-service + logTopicAgent: sidecar + input: + topics: + - persistent://public/default/input-download-java-package-service-topic + typeClassName: java.lang.String + output: + topic: persistent://public/default/output-download-java-package-service-topic + typeClassName: java.lang.String + resources: + requests: + cpu: 50m + memory: 1G + limits: + cpu: "0.2" + memory: 1.1G + secretsMap: + "name": + path: "test-secret" + key: "username" + "pwd": + path: "test-secret" + key: "password" + pulsar: + pulsarConfig: "test-pulsar-runtime" + tlsConfig: + enabled: false + allowInsecure: false + hostnameVerification: true + certSecretName: sn-platform-tls-broker + certSecretKey: "" + authConfig: + oauth2Config: + audience: urn:sn:pulsar:sndev:test + issuerUrl: https://auth.sncloud-stg.dev/ + keySecretName: sn-platform-oauth2-private-key + keySecretKey: auth.json + packageService: + pulsarConfig: "test-pulsar-package-service" + tlsConfig: + enabled: false + allowInsecure: false + hostnameVerification: true + certSecretName: sn-platform-tls-broker + certSecretKey: "" + authConfig: + oauth2Config: + audience: urn:sn:pulsar:sndev:test + issuerUrl: https://auth.sncloud-stg.dev/ + keySecretName: sn-platform-oauth2-private-key + keySecretKey: auth.json + java: + jar: pulsar-functions-api-examples.jar + jarLocation: function://public/default/test-java-function + clusterName: test + autoAck: true +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-pulsar-runtime +data: + webServiceURL: http://invalid-runtime-admin.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: test-pulsar-package-service +data: + webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080 + brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650 +--- +apiVersion: v1 +data: + username: YWRtaW4= + password: MWYyZDFlMmU2N2Rm +kind: Secret +metadata: + name: test-secret +type: Opaque diff --git a/.ci/tests/integration-oauth2/cases/java-download-function-with-package-service/verify.sh b/.ci/tests/integration-oauth2/cases/java-download-function-with-package-service/verify.sh new file mode 100755 index 000000000..6626b2e8d --- /dev/null +++ b/.ci/tests/integration-oauth2/cases/java-download-function-with-package-service/verify.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +E2E_DIR=$(dirname "$0") +BASE_DIR=$(cd "${E2E_DIR}"/../../../../..;pwd) +PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"} +PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"} +E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"} +MANIFESTS_FILE="${BASE_DIR}"/.ci/tests/integration-oauth2/cases/java-download-function-with-package-service/manifests.yaml + +source "${BASE_DIR}"/.ci/helm.sh + +FUNCTION_NAME=function-download-sample-package-service +STS_NAME=${FUNCTION_NAME}-function + +if [ ! "$KUBECONFIG" ]; then + export KUBECONFIG=${E2E_KUBECONFIG} +fi + +kubectl apply -f "${MANIFESTS_FILE}" > /dev/null 2>&1 + +verify_fm_result=$(ci::verify_function_mesh ${FUNCTION_NAME} 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_fm_result" + kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true + exit 1 +fi + +sts_yaml=$(kubectl get statefulset "${STS_NAME}" -o yaml 2>&1) +echo "${sts_yaml}" | grep "prefix: PACKAGE_" > /dev/null 2>&1 || { + echo "packageService env prefix is not injected" + kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true + exit 1 +} +echo "${sts_yaml}" | grep "name: test-pulsar-package-service" > /dev/null 2>&1 || { + echo "packageService configmap is not injected" + kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true + exit 1 +} +echo "${sts_yaml}" | grep "/etc/oauth2-package-service" > /dev/null 2>&1 || { + echo "packageService oauth2 volume mount is not injected" + kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true + exit 1 +} + +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_exclamation_function_with_auth \ + persistent://public/default/input-download-java-package-service-topic \ + persistent://public/default/output-download-java-package-service-topic \ + test-message test-message! 10 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_java_result" + kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true + exit 1 +fi + +verify_log_topic=$(ci::verify_log_topic_with_auth persistent://public/default/logging-function-logs-package-service "it is not a NAR file" 10 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_log_topic" + kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true + exit 1 +fi + +kubectl delete -f "${MANIFESTS_FILE}" > /dev/null 2>&1 || true +echo "e2e-test: ok" | yq eval - diff --git a/.ci/tests/integration-oauth2/e2e.yaml b/.ci/tests/integration-oauth2/e2e.yaml index 6f5f65ec5..6eec760dd 100644 --- a/.ci/tests/integration-oauth2/e2e.yaml +++ b/.ci/tests/integration-oauth2/e2e.yaml @@ -131,6 +131,8 @@ verify: expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration-oauth2/cases/java-download-function/verify.sh expected: expected.data.yaml + - query: timeout 5m bash .ci/tests/integration-oauth2/cases/java-download-function-with-package-service/verify.sh + expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration-oauth2/cases/java-download-function-generic-auth/verify.sh expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration-oauth2/cases/py-download-function/verify.sh diff --git a/.ci/tests/integration-oauth2/e2e_with_downloader.yaml b/.ci/tests/integration-oauth2/e2e_with_downloader.yaml index c17efcd95..bb2b52f46 100644 --- a/.ci/tests/integration-oauth2/e2e_with_downloader.yaml +++ b/.ci/tests/integration-oauth2/e2e_with_downloader.yaml @@ -125,6 +125,8 @@ verify: expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration-oauth2/cases/java-download-function/verify.sh expected: expected.data.yaml + - query: timeout 5m bash .ci/tests/integration-oauth2/cases/java-download-function-with-package-service/verify.sh + expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration-oauth2/cases/java-download-function-generic-auth/verify.sh expected: expected.data.yaml - query: timeout 5m bash .ci/tests/integration-oauth2/cases/py-download-function/verify.sh diff --git a/.github/workflows/bundle-release.yml b/.github/workflows/bundle-release.yml index 419430086..dab7eea51 100644 --- a/.github/workflows/bundle-release.yml +++ b/.github/workflows/bundle-release.yml @@ -49,10 +49,10 @@ jobs: username: ${{ secrets.DOCKER_USER }} password: ${{ secrets.DOCKER_PASSWORD }} - - name: Set up GO 1.24.11 + - name: Set up GO 1.24.13 uses: actions/setup-go@v5 with: - go-version: 1.24.11 + go-version: 1.24.13 id: go - name: InstallKubebuilder @@ -180,10 +180,10 @@ jobs: username: ${{ secrets.DOCKER_USER }} password: ${{ secrets.DOCKER_PASSWORD }} - - name: Set up GO 1.24.11 + - name: Set up GO 1.24.13 uses: actions/setup-go@v5 with: - go-version: 1.24.11 + go-version: 1.24.13 id: go - name: InstallKubebuilder diff --git a/.github/workflows/olm-verify.yml b/.github/workflows/olm-verify.yml index 40d8e6695..068f76ed1 100644 --- a/.github/workflows/olm-verify.yml +++ b/.github/workflows/olm-verify.yml @@ -34,10 +34,10 @@ jobs: - name: checkout uses: actions/checkout@v2 - - name: Set up GO 1.24.11 + - name: Set up GO 1.24.13 uses: actions/setup-go@v5 with: - go-version: 1.24.11 + go-version: 1.24.13 id: go - name: InstallKubebuilder diff --git a/.github/workflows/project.yml b/.github/workflows/project.yml index 66a87e81f..8970481db 100644 --- a/.github/workflows/project.yml +++ b/.github/workflows/project.yml @@ -18,7 +18,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.22.12, 1.24.11] + go-version: [1.22.12, 1.24.13] steps: - name: Free Disk Space (Ubuntu) uses: jlumbroso/free-disk-space@v1.3.0 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b6e882c9d..44ceef083 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -37,10 +37,10 @@ jobs: username: ${{ secrets.DOCKER_USER }} password: ${{ secrets.DOCKER_PASSWORD }} - - name: Set up GO 1.24.11 + - name: Set up GO 1.24.13 uses: actions/setup-go@v5 with: - go-version: 1.24.11 + go-version: 1.24.13 id: go - name: InstallKubebuilder diff --git a/.github/workflows/test-helm-charts.yml b/.github/workflows/test-helm-charts.yml index 89b3c6ce4..3c1b86028 100644 --- a/.github/workflows/test-helm-charts.yml +++ b/.github/workflows/test-helm-charts.yml @@ -83,11 +83,11 @@ jobs: run: hack/kind-cluster-build.sh --name chart-testing -c 1 -v 10 --k8sVersion v1.23.17 if: steps.list-changed.outputs.changed == 'true' - - name: Set up GO 1.24.11 + - name: Set up GO 1.24.13 if: steps.list-changed.outputs.changed == 'true' uses: actions/setup-go@v5 with: - go-version: 1.24.11 + go-version: 1.24.13 id: go - name: setup kubebuilder 3.6.0 diff --git a/.github/workflows/trivy.yml b/.github/workflows/trivy.yml index 901d88cf4..c9aa13869 100644 --- a/.github/workflows/trivy.yml +++ b/.github/workflows/trivy.yml @@ -57,10 +57,10 @@ jobs: repository: ${{github.event.pull_request.head.repo.full_name}} ref: ${{ github.event.pull_request.head.sha }} - - name: Set up GO 1.24.11 + - name: Set up GO 1.24.13 uses: actions/setup-go@v5 with: - go-version: 1.24.11 + go-version: 1.24.13 id: go - name: InstallKubebuilder diff --git a/.github/workflows/trivy_scheduled_master.yml b/.github/workflows/trivy_scheduled_master.yml index 725efe37f..e40563606 100644 --- a/.github/workflows/trivy_scheduled_master.yml +++ b/.github/workflows/trivy_scheduled_master.yml @@ -67,10 +67,10 @@ jobs: repository: ${{github.event.pull_request.head.repo.full_name}} ref: ${{ github.event.pull_request.head.sha }} - - name: Set up GO 1.24.11 + - name: Set up GO 1.24.13 uses: actions/setup-go@v5 with: - go-version: 1.24.11 + go-version: 1.24.13 id: go - name: InstallKubebuilder diff --git a/Dockerfile b/Dockerfile index 0beea82fa..9e25b98c8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM golang:1.24.11-trixie as builder +FROM golang:1.24.13-trixie as builder WORKDIR /workspace/api COPY api/ . diff --git a/api/compute/v1alpha1/function_types.go b/api/compute/v1alpha1/function_types.go index 2ac613977..5379437c7 100644 --- a/api/compute/v1alpha1/function_types.go +++ b/api/compute/v1alpha1/function_types.go @@ -93,6 +93,9 @@ type FunctionSpec struct { // +kubebuilder:validation:Required Messaging `json:",inline"` + // PackageService is used for package download when specified. + // If empty, the package download falls back to Messaging.Pulsar. + PackageService *PulsarMessaging `json:"packageService,omitempty"` // +kubebuilder:validation:Required Runtime `json:",inline"` diff --git a/api/compute/v1alpha1/sink_types.go b/api/compute/v1alpha1/sink_types.go index 94f3ab9c8..ddb6d6048 100644 --- a/api/compute/v1alpha1/sink_types.go +++ b/api/compute/v1alpha1/sink_types.go @@ -86,6 +86,9 @@ type SinkSpec struct { // +kubebuilder:validation:Required Messaging `json:",inline"` + // PackageService is used for package download when specified. + // If empty, the package download falls back to Messaging.Pulsar. + PackageService *PulsarMessaging `json:"packageService,omitempty"` // +kubebuilder:validation:Required Runtime `json:",inline"` diff --git a/api/compute/v1alpha1/source_types.go b/api/compute/v1alpha1/source_types.go index 7de789600..8f116e9c7 100644 --- a/api/compute/v1alpha1/source_types.go +++ b/api/compute/v1alpha1/source_types.go @@ -82,6 +82,9 @@ type SourceSpec struct { // +kubebuilder:validation:Required Messaging `json:",inline"` + // PackageService is used for package download when specified. + // If empty, the package download falls back to Messaging.Pulsar. + PackageService *PulsarMessaging `json:"packageService,omitempty"` // +kubebuilder:validation:Required Runtime `json:",inline"` diff --git a/api/compute/v1alpha1/zz_generated.deepcopy.go b/api/compute/v1alpha1/zz_generated.deepcopy.go index 31bd91e0e..6d44ac0a9 100644 --- a/api/compute/v1alpha1/zz_generated.deepcopy.go +++ b/api/compute/v1alpha1/zz_generated.deepcopy.go @@ -536,6 +536,11 @@ func (in *FunctionSpec) DeepCopyInto(out *FunctionSpec) { (*in).DeepCopyInto(*out) } in.Messaging.DeepCopyInto(&out.Messaging) + if in.PackageService != nil { + in, out := &in.PackageService, &out.PackageService + *out = new(PulsarMessaging) + (*in).DeepCopyInto(*out) + } in.Runtime.DeepCopyInto(&out.Runtime) if in.StateConfig != nil { in, out := &in.StateConfig, &out.StateConfig @@ -1271,6 +1276,11 @@ func (in *SinkSpec) DeepCopyInto(out *SinkSpec) { } in.Pod.DeepCopyInto(&out.Pod) in.Messaging.DeepCopyInto(&out.Messaging) + if in.PackageService != nil { + in, out := &in.PackageService, &out.PackageService + *out = new(PulsarMessaging) + (*in).DeepCopyInto(*out) + } in.Runtime.DeepCopyInto(&out.Runtime) if in.StateConfig != nil { in, out := &in.StateConfig, &out.StateConfig @@ -1444,6 +1454,11 @@ func (in *SourceSpec) DeepCopyInto(out *SourceSpec) { } in.Pod.DeepCopyInto(&out.Pod) in.Messaging.DeepCopyInto(&out.Messaging) + if in.PackageService != nil { + in, out := &in.PackageService, &out.PackageService + *out = new(PulsarMessaging) + (*in).DeepCopyInto(*out) + } in.Runtime.DeepCopyInto(&out.Runtime) if in.StateConfig != nil { in, out := &in.StateConfig, &out.StateConfig diff --git a/api/go.mod b/api/go.mod index 846e2ce52..b2297c29a 100644 --- a/api/go.mod +++ b/api/go.mod @@ -1,6 +1,6 @@ module github.com/streamnative/function-mesh/api -go 1.24.11 +go 1.24.13 require ( k8s.io/api v0.30.9 diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml index 31399d487..c2a144942 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml @@ -352,6 +352,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object persistentVolumeClaimRetentionPolicy: properties: whenDeleted: @@ -4276,6 +4360,90 @@ spec: negativeAckRedeliveryDelayMs: format: int32 type: integer + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: @@ -7940,6 +8108,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml index 369bbb45b..e9b293657 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml @@ -371,6 +371,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object persistentVolumeClaimRetentionPolicy: properties: whenDeleted: diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml index f91eeeee1..924f1b1d8 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml @@ -299,6 +299,90 @@ spec: negativeAckRedeliveryDelayMs: format: int32 type: integer + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml index 219643f22..2b353937a 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml @@ -294,6 +294,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 8c137356a..da0f23e78 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -352,6 +352,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object persistentVolumeClaimRetentionPolicy: properties: whenDeleted: @@ -4276,6 +4360,90 @@ spec: negativeAckRedeliveryDelayMs: format: int32 type: integer + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: @@ -7940,6 +8108,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index beb60ff6c..58cebd789 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -349,6 +349,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object persistentVolumeClaimRetentionPolicy: properties: whenDeleted: diff --git a/config/crd/bases/compute.functionmesh.io_sinks.yaml b/config/crd/bases/compute.functionmesh.io_sinks.yaml index 3de747270..49d62e2cc 100644 --- a/config/crd/bases/compute.functionmesh.io_sinks.yaml +++ b/config/crd/bases/compute.functionmesh.io_sinks.yaml @@ -277,6 +277,90 @@ spec: negativeAckRedeliveryDelayMs: format: int32 type: integer + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 210d5bca1..2ad88024c 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -272,6 +272,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 1c093590b..fd7199417 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -78,6 +78,9 @@ const ( DownloaderVolume = "downloader-volume" DownloaderImage = DefaultRunnerPrefix + "pulsarctl:2.10.2.3" DownloadDir = "/pulsar/download" + PackageServiceEnvPrefix = "PACKAGE_" + PackageOAuth2MountPath = "/etc/oauth2-package-service" + PackageTLSMountPath = "/etc/tls/pulsar-functions-package-service" CleanupContainerName = "cleanup" @@ -173,6 +176,115 @@ type TLSConfig interface { GetMountPath() string } +type tlsConfigWithCustomMountPath struct { + TLSConfig + mountPath string +} + +func (t *tlsConfigWithCustomMountPath) GetMountPath() string { + return t.mountPath +} + +type downloadCommandEnvNames struct { + webServiceURL string + clientAuthenticationPlugin string + clientAuthenticationParams string + tlsAllowInsecure string + tlsHostnameVerification string + tlsTrustCertsFilePath string +} + +func defaultDownloadCommandEnvNames() downloadCommandEnvNames { + return downloadCommandEnvNames{ + webServiceURL: "webServiceURL", + clientAuthenticationPlugin: "clientAuthenticationPlugin", + clientAuthenticationParams: "clientAuthenticationParameters", + tlsAllowInsecure: "tlsAllowInsecureConnection", + tlsHostnameVerification: "tlsHostnameVerificationEnable", + tlsTrustCertsFilePath: "tlsTrustCertsFilePath", + } +} + +func prefixedDownloadCommandEnvNames(prefix string) downloadCommandEnvNames { + return downloadCommandEnvNames{ + webServiceURL: prefix + "webServiceURL", + clientAuthenticationPlugin: prefix + "clientAuthenticationPlugin", + clientAuthenticationParams: prefix + "clientAuthenticationParameters", + tlsAllowInsecure: prefix + "tlsAllowInsecureConnection", + tlsHostnameVerification: prefix + "tlsHostnameVerificationEnable", + tlsTrustCertsFilePath: prefix + "tlsTrustCertsFilePath", + } +} + +type downloadServiceConfig struct { + pulsar *v1alpha1.PulsarMessaging + envPrefix string + envNames downloadCommandEnvNames + oauth2Mount string + tlsMount string +} + +func newDownloadServiceConfig(packageService, messaging *v1alpha1.PulsarMessaging) downloadServiceConfig { + if packageService != nil { + return downloadServiceConfig{ + pulsar: packageService, + envPrefix: PackageServiceEnvPrefix, + envNames: prefixedDownloadCommandEnvNames(PackageServiceEnvPrefix), + oauth2Mount: PackageOAuth2MountPath, + tlsMount: PackageTLSMountPath, + } + } + return downloadServiceConfig{ + pulsar: messaging, + envNames: defaultDownloadCommandEnvNames(), + oauth2Mount: "", + tlsMount: "", + } +} + +func (d downloadServiceConfig) authProvided() bool { + return d.pulsar != nil && d.pulsar.AuthSecret != "" +} + +func (d downloadServiceConfig) tlsProvided() bool { + return d.pulsar != nil && d.pulsar.TLSSecret != "" +} + +func (d downloadServiceConfig) authConfig() *v1alpha1.AuthConfig { + if d.pulsar == nil { + return nil + } + return d.pulsar.AuthConfig +} + +func (d downloadServiceConfig) tlsConfig() TLSConfig { + if d.pulsar == nil || d.pulsar.TLSConfig == nil { + return nil + } + if d.tlsMount == "" { + return d.pulsar.TLSConfig + } + return &tlsConfigWithCustomMountPath{ + TLSConfig: d.pulsar.TLSConfig, + mountPath: d.tlsMount, + } +} + +func (d downloadServiceConfig) envFrom() []corev1.EnvFromSource { + if d.pulsar == nil { + return nil + } + return GenerateContainerEnvFromWithPrefix(d.pulsar.PulsarConfig, d.pulsar.AuthSecret, d.pulsar.TLSSecret, d.envPrefix) +} + +func isNilTLSConfig(tlsConfig TLSConfig) bool { + if tlsConfig == nil { + return true + } + value := reflect.ValueOf(tlsConfig) + return value.Kind() == reflect.Ptr && value.IsNil() +} + type RollingCfg struct { Enabled bool Type string // "size" or "time" @@ -239,13 +351,25 @@ func MakeHeadlessServiceName(serviceName string) string { } func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, container *corev1.Container, - volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, authConfig *v1alpha1.AuthConfig, - tlsConfig TLSConfig, pulsarConfig, authSecret, tlsSecret string, javaRuntime *v1alpha1.JavaRuntime, + volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, runtimeMessaging *v1alpha1.PulsarMessaging, + downloadConfig downloadServiceConfig, + javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime, goRuntime *v1alpha1.GoRuntime, env []corev1.EnvVar, logTopic, filebeatImage string, logTopicAgent v1alpha1.LogTopicAgent, definedVolumeMounts []corev1.VolumeMount, volumeClaimTemplates []corev1.PersistentVolumeClaim, persistentVolumeClaimRetentionPolicy *appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy) *appsv1.StatefulSet { - filebeatContainer := makeFilebeatContainer(definedVolumeMounts, env, logTopic, logTopicAgent, tlsConfig, authConfig, pulsarConfig, tlsSecret, authSecret, filebeatImage) + var runtimeTLSConfig TLSConfig = nil + var runtimeAuthConfig *v1alpha1.AuthConfig = nil + var runtimePulsarConfig, runtimeAuthSecret, runtimeTLSSecret string + if runtimeMessaging != nil { + runtimeTLSConfig = runtimeMessaging.TLSConfig + runtimeAuthConfig = runtimeMessaging.AuthConfig + runtimePulsarConfig = runtimeMessaging.PulsarConfig + runtimeAuthSecret = runtimeMessaging.AuthSecret + runtimeTLSSecret = runtimeMessaging.TLSSecret + } + + filebeatContainer := makeFilebeatContainer(definedVolumeMounts, env, logTopic, logTopicAgent, runtimeTLSConfig, runtimeAuthConfig, runtimePulsarConfig, runtimeTLSSecret, runtimeAuthSecret, filebeatImage) volumeMounts := generateDownloaderVolumeMountsForDownloader(javaRuntime, pythonRuntime, goRuntime) var downloaderContainer *corev1.Container @@ -266,12 +390,19 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI // mount auth and tls related VolumeMounts when download package from pulsar if !hasHTTPPrefix(downloadPath) { - if authConfig != nil && authConfig.OAuth2Config != nil { - volumeMounts = append(volumeMounts, generateVolumeMountFromOAuth2Config(authConfig.OAuth2Config)) + if authConfig := downloadConfig.authConfig(); authConfig != nil && authConfig.OAuth2Config != nil { + oauth2MountPath := authConfig.OAuth2Config.GetMountPath() + if downloadConfig.oauth2Mount != "" { + oauth2MountPath = downloadConfig.oauth2Mount + } + volumeMounts = appendVolumeMountIfNotExists(volumeMounts, generateVolumeMountFromOAuth2ConfigWithMountPath(authConfig.OAuth2Config, oauth2MountPath)) + podVolumes = appendVolumeIfNotExists(podVolumes, generateVolumeFromOAuth2Config(authConfig.OAuth2Config)) } - if !reflect.ValueOf(tlsConfig).IsNil() && tlsConfig.HasSecretVolume() { - volumeMounts = append(volumeMounts, generateVolumeMountFromTLSConfig(tlsConfig)) + tlsConfig := downloadConfig.tlsConfig() + if !isNilTLSConfig(tlsConfig) && tlsConfig.HasSecretVolume() { + volumeMounts = appendVolumeMountIfNotExists(volumeMounts, generateVolumeMountFromTLSConfig(tlsConfig)) + podVolumes = appendVolumeIfNotExists(podVolumes, generateVolumeFromTLSConfig(tlsConfig)) } } volumeMounts = append(volumeMounts, definedVolumeMounts...) @@ -287,17 +418,18 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI Name: DownloaderName, Image: image, Command: []string{"sh", "-c", - strings.Join(GetDownloadCommand(downloadPath, componentPackage, true, true, - authSecret != "", tlsSecret != "", tlsConfig, authConfig), " ")}, + strings.Join(GetDownloadCommandWithEnv(downloadPath, componentPackage, true, true, + downloadConfig.authProvided(), downloadConfig.tlsProvided(), downloadConfig.tlsConfig(), + downloadConfig.authConfig(), downloadConfig.envNames, downloadConfig.oauth2Mount), " ")}, VolumeMounts: volumeMounts, ImagePullPolicy: corev1.PullIfNotPresent, Env: []corev1.EnvVar{{ Name: "HOME", Value: "/tmp", }}, - EnvFrom: GenerateContainerEnvFrom(pulsarConfig, authSecret, tlsSecret), + EnvFrom: downloadConfig.envFrom(), } - podVolumes = append(podVolumes, corev1.Volume{ + podVolumes = appendVolumeIfNotExists(podVolumes, corev1.Volume{ Name: DownloaderVolume, }) } @@ -538,7 +670,8 @@ func GenerateAffinity(affinity *corev1.Affinity, labels map[string]string, disab } func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, generateLogConfigCommand, logLevel, details, extraDependenciesDir, connectorsDirectory, uid string, - memory *resource.Quantity, javaOpts []string, hasPulsarctl, hasWget, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, + memory *resource.Quantity, javaOpts []string, hasPulsarctl, hasWget bool, downloadConfig downloadServiceConfig, + authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, maxPendingAsyncRequests *int32, logConfigFileName, instancePath, entryClass string) []string { @@ -548,15 +681,16 @@ func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, gener authConfig, maxPendingAsyncRequests, logConfigFileName, instancePath, entryClass), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided - downloadCommand := strings.Join(GetDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget, - authProvided, tlsProvided, tlsConfig, authConfig), " ") + downloadCommand := strings.Join(GetDownloadCommandWithEnv(downloadPath, packageFile, hasPulsarctl, hasWget, + downloadConfig.authProvided(), downloadConfig.tlsProvided(), downloadConfig.tlsConfig(), + downloadConfig.authConfig(), downloadConfig.envNames, downloadConfig.oauth2Mount), " ") processCommand = downloadCommand + " && " + processCommand } return []string{"bash", "-c", processCommand} } func MakePythonFunctionCommand(downloadPath, packageFile, name, clusterName, generateLogConfigCommand, details, uid string, - hasPulsarctl, hasWget, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, + hasPulsarctl, hasWget bool, downloadConfig downloadServiceConfig, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + generateLogConfigCommand + @@ -564,15 +698,16 @@ func MakePythonFunctionCommand(downloadPath, packageFile, name, clusterName, gen details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided - downloadCommand := strings.Join(GetDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget, - authProvided, - tlsProvided, tlsConfig, authConfig), " ") + downloadCommand := strings.Join(GetDownloadCommandWithEnv(downloadPath, packageFile, hasPulsarctl, hasWget, + downloadConfig.authProvided(), + downloadConfig.tlsProvided(), downloadConfig.tlsConfig(), downloadConfig.authConfig(), + downloadConfig.envNames, downloadConfig.oauth2Mount), " ") processCommand = downloadCommand + " && " + processCommand } return []string{"bash", "-c", processCommand} } -func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alpha1.Function) []string { +func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alpha1.Function, downloadConfig downloadServiceConfig) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + strings.Join(getProcessGoRuntimeArgs(goExecFilePath, function), " ") if downloadPath != "" && !utils.EnableInitContainers { @@ -583,15 +718,16 @@ func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alph hasPulsarctl = true hasWget = true } - downloadCommand := strings.Join(GetDownloadCommand(downloadPath, goExecFilePath, - hasPulsarctl, hasWget, function.Spec.Pulsar.AuthSecret != "", - function.Spec.Pulsar.TLSSecret != "", function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig), " ") + downloadCommand := strings.Join(GetDownloadCommandWithEnv(downloadPath, goExecFilePath, + hasPulsarctl, hasWget, downloadConfig.authProvided(), + downloadConfig.tlsProvided(), downloadConfig.tlsConfig(), downloadConfig.authConfig(), + downloadConfig.envNames, downloadConfig.oauth2Mount), " ") processCommand = downloadCommand + " && ls -al && pwd &&" + processCommand } return []string{"bash", "-c", processCommand} } -func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterName, details, uid string, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, +func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterName, details, uid string, downloadConfig downloadServiceConfig, authProvided, tlsProvided bool, secretMaps map[string]v1alpha1.SecretRef, state *v1alpha1.Stateful, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { processCommand := setShardIDEnvironmentVariableCommand() + " && " + @@ -599,9 +735,10 @@ func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterNam details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided - downloadCommand := strings.Join(GetDownloadCommand(downloadPath, functionFile, true, true, - authProvided, - tlsProvided, tlsConfig, authConfig), " ") + downloadCommand := strings.Join(GetDownloadCommandWithEnv(downloadPath, functionFile, true, true, + downloadConfig.authProvided(), + downloadConfig.tlsProvided(), downloadConfig.tlsConfig(), downloadConfig.authConfig(), + downloadConfig.envNames, downloadConfig.oauth2Mount), " ") processCommand = downloadCommand + " && " + processCommand } return []string{"sh", "-c", processCommand} @@ -684,12 +821,37 @@ func TriggerCleanup(ctx context.Context, k8sclient client.Client, restClient res return nil } +func getOAuth2MountFile(authConfig *v1alpha1.OAuth2Config, mountPath string) string { + if authConfig == nil { + return "" + } + if mountPath == "" { + return authConfig.GetMountFile() + } + return fmt.Sprintf("%s/%s", mountPath, authConfig.KeySecretKey) +} + +func makeOAuth2AuthenticationParameters(authConfig *v1alpha1.OAuth2Config, mountPath string) string { + if authConfig == nil { + return "" + } + credentialsFile := getOAuth2MountFile(authConfig, mountPath) + return fmt.Sprintf(`'{"credentials_url":"file://%s","privateKey":"%s","private_key":"%s","issuerUrl":"%s","issuer_url":"%s","audience":"%s","scope":"%s"}'`, + credentialsFile, credentialsFile, credentialsFile, authConfig.IssuerURL, authConfig.IssuerURL, authConfig.Audience, authConfig.Scope) +} + func getPulsarAdminCommand(authProvided, tlsProvided bool, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { + return getPulsarAdminCommandWithEnv(authProvided, tlsProvided, tlsConfig, authConfig, + defaultDownloadCommandEnvNames(), "") +} + +func getPulsarAdminCommandWithEnv(authProvided, tlsProvided bool, tlsConfig TLSConfig, + authConfig *v1alpha1.AuthConfig, envNames downloadCommandEnvNames, oauth2MountPath string) []string { args := []string{ PulsarAdminExecutableFile, "--admin-url", - "$webServiceURL", + "$" + envNames.webServiceURL, } if authConfig != nil { if authConfig.OAuth2Config != nil { @@ -697,7 +859,7 @@ func getPulsarAdminCommand(authProvided, tlsProvided bool, tlsConfig TLSConfig, "--auth-plugin", OAuth2AuthenticationPlugin, "--auth-params", - authConfig.OAuth2Config.AuthenticationParameters(), + makeOAuth2AuthenticationParameters(authConfig.OAuth2Config, oauth2MountPath), }...) } else if authConfig.GenericAuth != nil { args = append(args, []string{ @@ -710,19 +872,19 @@ func getPulsarAdminCommand(authProvided, tlsProvided bool, tlsConfig TLSConfig, } else if authProvided { args = append(args, []string{ "--auth-plugin", - "$clientAuthenticationPlugin", + "$" + envNames.clientAuthenticationPlugin, "--auth-params", - "$clientAuthenticationParameters"}...) + "$" + envNames.clientAuthenticationParams}...) } // Use traditional way - if reflect.ValueOf(tlsConfig).IsNil() { + if isNilTLSConfig(tlsConfig) { if tlsProvided { args = append(args, []string{ "--tls-allow-insecure", "--tls-enable-hostname-verification", "--tls-trust-cert-path", - "$tlsTrustCertsFilePath", + "$" + envNames.tlsTrustCertsFilePath, }...) } } else { @@ -749,11 +911,17 @@ func getPulsarAdminCommand(authProvided, tlsProvided bool, tlsConfig TLSConfig, func getPulsarctlCommand(authProvided, tlsProvided bool, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { + return getPulsarctlCommandWithEnv(authProvided, tlsProvided, tlsConfig, authConfig, + defaultDownloadCommandEnvNames(), "") +} + +func getPulsarctlCommandWithEnv(authProvided, tlsProvided bool, tlsConfig TLSConfig, + authConfig *v1alpha1.AuthConfig, envNames downloadCommandEnvNames, oauth2MountPath string) []string { args := []string{ "export PATH=$PATH:/pulsar/bin && ", PulsarctlExecutableFile, "--admin-service-url", - "$webServiceURL", + "$" + envNames.webServiceURL, } // activate oauth2 for pulsarctl if authConfig != nil { @@ -765,13 +933,13 @@ func getPulsarctlCommand(authProvided, tlsProvided bool, tlsConfig TLSConfig, "set", "downloader", "--admin-service-url", - "$webServiceURL", + "$" + envNames.webServiceURL, "--issuer-endpoint", authConfig.OAuth2Config.IssuerURL, "--audience", authConfig.OAuth2Config.Audience, "--key-file", - authConfig.OAuth2Config.GetMountFile(), + getOAuth2MountFile(authConfig.OAuth2Config, oauth2MountPath), } if authConfig.OAuth2Config.Scope != "" { args = append(args, []string{ @@ -800,7 +968,7 @@ func getPulsarctlCommand(authProvided, tlsProvided bool, tlsConfig TLSConfig, "--auth-params", "'" + authConfig.GenericAuth.ClientAuthenticationParameters + "'", "--admin-service-url", - "$webServiceURL", + "$" + envNames.webServiceURL, } } } else if authProvided { @@ -810,26 +978,26 @@ func getPulsarctlCommand(authProvided, tlsProvided bool, tlsConfig TLSConfig, "oauth2", "activate", "--auth-params", - "$clientAuthenticationParameters || true", + "$" + envNames.clientAuthenticationParams + " || true", ") &&", PulsarctlExecutableFile, "--auth-plugin", - "$clientAuthenticationPlugin", + "$" + envNames.clientAuthenticationPlugin, "--auth-params", - "$clientAuthenticationParameters", + "$" + envNames.clientAuthenticationParams, "--admin-service-url", - "$webServiceURL", + "$" + envNames.webServiceURL, } } // Use traditional way - if reflect.ValueOf(tlsConfig).IsNil() { + if isNilTLSConfig(tlsConfig) { if tlsProvided { args = append(args, []string{ - "--tls-allow-insecure=${tlsAllowInsecureConnection:-" + DefaultForAllowInsecure + "}", - "--tls-enable-hostname-verification=${tlsHostnameVerificationEnable:-" + DefaultForEnableHostNameVerification + "}", + "--tls-allow-insecure=${" + envNames.tlsAllowInsecure + ":-" + DefaultForAllowInsecure + "}", + "--tls-enable-hostname-verification=${" + envNames.tlsHostnameVerification + ":-" + DefaultForEnableHostNameVerification + "}", "--tls-trust-cert-path", - "$tlsTrustCertsFilePath", + "$" + envNames.tlsTrustCertsFilePath, }...) } } else { @@ -925,15 +1093,21 @@ func getCleanUpCommand(hasPulsarctl, authProvided, tlsProvided bool, tlsConfig T func GetDownloadCommand(downloadPath, componentPackage string, hasPulsarctl, hasWget, authProvided, tlsProvided bool, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { + return GetDownloadCommandWithEnv(downloadPath, componentPackage, hasPulsarctl, hasWget, authProvided, tlsProvided, tlsConfig, authConfig, + defaultDownloadCommandEnvNames(), "") +} + +func GetDownloadCommandWithEnv(downloadPath, componentPackage string, hasPulsarctl, hasWget, authProvided, tlsProvided bool, + tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, envNames downloadCommandEnvNames, oauth2MountPath string) []string { var args []string if hasHTTPPrefix(downloadPath) && hasWget { args = append(args, "wget", downloadPath, "-O", componentPackage) return args } if hasPulsarctl { - args = getPulsarctlCommand(authProvided, tlsProvided, tlsConfig, authConfig) + args = getPulsarctlCommandWithEnv(authProvided, tlsProvided, tlsConfig, authConfig, envNames, oauth2MountPath) } else { - args = getPulsarAdminCommand(authProvided, tlsProvided, tlsConfig, authConfig) + args = getPulsarAdminCommandWithEnv(authProvided, tlsProvided, tlsConfig, authConfig, envNames, oauth2MountPath) } if hasPackageNamePrefix(downloadPath) { args = append(args, []string{ @@ -1474,7 +1648,7 @@ func getSharedArgs(details, clusterName, uid string, authProvided bool, tlsProvi } // Use traditional way - if reflect.ValueOf(tlsConfig).IsNil() { + if isNilTLSConfig(tlsConfig) { if tlsProvided { args = append(args, []string{ "--use_tls", @@ -1663,9 +1837,14 @@ func generateBasicContainerEnv(secrets map[string]v1alpha1.SecretRef, env []core } func GenerateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecret string) []corev1.EnvFromSource { + return GenerateContainerEnvFromWithPrefix(messagingConfig, authSecret, tlsSecret, "") +} + +func GenerateContainerEnvFromWithPrefix(messagingConfig string, authSecret string, tlsSecret string, prefix string) []corev1.EnvFromSource { var envs []corev1.EnvFromSource if messagingConfig != "" { envs = append(envs, corev1.EnvFromSource{ + Prefix: prefix, ConfigMapRef: &corev1.ConfigMapEnvSource{ LocalObjectReference: corev1.LocalObjectReference{Name: messagingConfig}, }, @@ -1674,6 +1853,7 @@ func GenerateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecr if authSecret != "" { envs = append(envs, corev1.EnvFromSource{ + Prefix: prefix, SecretRef: &corev1.SecretEnvSource{ LocalObjectReference: corev1.LocalObjectReference{Name: authSecret}, }, @@ -1682,6 +1862,7 @@ func GenerateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecr if tlsSecret != "" { envs = append(envs, corev1.EnvFromSource{ + Prefix: prefix, SecretRef: &corev1.SecretEnvSource{ LocalObjectReference: corev1.LocalObjectReference{Name: tlsSecret}, }, @@ -1867,12 +2048,55 @@ func generateVolumeMountFromTLSConfig(tlsConfig TLSConfig) corev1.VolumeMount { } func generateVolumeMountFromOAuth2Config(config *v1alpha1.OAuth2Config) corev1.VolumeMount { + return generateVolumeMountFromOAuth2ConfigWithMountPath(config, config.GetMountPath()) +} + +func generateVolumeMountFromOAuth2ConfigWithMountPath(config *v1alpha1.OAuth2Config, mountPath string) corev1.VolumeMount { return corev1.VolumeMount{ Name: generateVolumeNameFromOAuth2Config(config), - MountPath: config.GetMountPath(), + MountPath: mountPath, } } +func appendPackageServiceVolumes(volumes []corev1.Volume, packageService *v1alpha1.PulsarMessaging) []corev1.Volume { + if packageService == nil { + return volumes + } + if packageService.TLSConfig != nil { + packageTLSConfig := &tlsConfigWithCustomMountPath{ + TLSConfig: packageService.TLSConfig, + mountPath: PackageTLSMountPath, + } + if packageTLSConfig.HasSecretVolume() { + volumes = appendVolumeIfNotExists(volumes, generateVolumeFromTLSConfig(packageTLSConfig)) + } + } + if packageService.AuthConfig != nil && packageService.AuthConfig.OAuth2Config != nil { + volumes = appendVolumeIfNotExists(volumes, generateVolumeFromOAuth2Config(packageService.AuthConfig.OAuth2Config)) + } + return volumes +} + +func appendPackageServiceVolumeMounts(mounts []corev1.VolumeMount, packageService *v1alpha1.PulsarMessaging) []corev1.VolumeMount { + if packageService == nil { + return mounts + } + if packageService.TLSConfig != nil { + packageTLSConfig := &tlsConfigWithCustomMountPath{ + TLSConfig: packageService.TLSConfig, + mountPath: PackageTLSMountPath, + } + if packageTLSConfig.HasSecretVolume() { + mounts = appendVolumeMountIfNotExists(mounts, generateVolumeMountFromTLSConfig(packageTLSConfig)) + } + } + if packageService.AuthConfig != nil && packageService.AuthConfig.OAuth2Config != nil { + mounts = appendVolumeMountIfNotExists(mounts, generateVolumeMountFromOAuth2ConfigWithMountPath( + packageService.AuthConfig.OAuth2Config, PackageOAuth2MountPath)) + } + return mounts +} + func generateContainerVolumeMountsFromConsumerConfigs(confs map[string]v1alpha1.ConsumerConfig) []corev1.VolumeMount { mounts := []corev1.VolumeMount{} if len(confs) > 0 { @@ -1969,7 +2193,7 @@ func GenerateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerCo MountPath: "/pulsar/logs/functions", }) } - if !reflect.ValueOf(tlsConfig).IsNil() && tlsConfig.HasSecretVolume() { + if !isNilTLSConfig(tlsConfig) && tlsConfig.HasSecretVolume() { mounts = append(mounts, generateVolumeMountFromTLSConfig(tlsConfig)) } if authConfig != nil { @@ -1992,7 +2216,7 @@ func GeneratePodVolumes(podVolumes []corev1.Volume, producerConf *v1alpha1.Produ if agent == v1alpha1.SIDECAR { volumes = append(volumes, generateFilebeatVolumes()...) } - if !reflect.ValueOf(tlsConfig).IsNil() && tlsConfig.HasSecretVolume() { + if !isNilTLSConfig(tlsConfig) && tlsConfig.HasSecretVolume() { volumes = append(volumes, generateVolumeFromTLSConfig(tlsConfig)) } if authConfig != nil { @@ -2006,6 +2230,24 @@ func GeneratePodVolumes(podVolumes []corev1.Volume, producerConf *v1alpha1.Produ return volumes } +func appendVolumeIfNotExists(volumes []corev1.Volume, volume corev1.Volume) []corev1.Volume { + for _, existing := range volumes { + if existing.Name == volume.Name { + return volumes + } + } + return append(volumes, volume) +} + +func appendVolumeMountIfNotExists(mounts []corev1.VolumeMount, mount corev1.VolumeMount) []corev1.VolumeMount { + for _, existing := range mounts { + if existing.Name == mount.Name && existing.MountPath == mount.MountPath && existing.SubPath == mount.SubPath { + return mounts + } + } + return append(mounts, mount) +} + func mergeMaps(labels ...map[string]string) map[string]string { merged := make(map[string]string) diff --git a/controllers/spec/common_test.go b/controllers/spec/common_test.go index 7bdbeae8a..7e1b2b154 100644 --- a/controllers/spec/common_test.go +++ b/controllers/spec/common_test.go @@ -420,7 +420,7 @@ func TestGetSourceRunnerImage(t *testing.T) { func TestMakeGoFunctionCommand(t *testing.T) { function := makeGoFunctionSample(TestFunctionName) - commands := MakeGoFunctionCommand("", "/pulsar/go-func", function) + commands := MakeGoFunctionCommand("", "/pulsar/go-func", function, newDownloadServiceConfig(nil, function.Spec.Pulsar)) assert.Equal(t, commands[0], "bash") assert.Equal(t, commands[1], "-c") assert.True(t, strings.HasPrefix(commands[2], "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}")) diff --git a/controllers/spec/function.go b/controllers/spec/function.go index fabee4f6d..221f848ec 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -66,10 +66,10 @@ func MakeFunctionStatefulSet(ctx context.Context, cli client.Client, function *v function.Spec.ImagePullPolicy = runnerImagePullPolicy labels := makeFunctionLabels(function) + downloadConfig := newDownloadServiceConfig(function.Spec.PackageService, function.Spec.Pulsar) statefulSet := MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, makeFunctionContainer(function), makeFunctionVolumes(function, function.Spec.Pulsar.AuthConfig), labels, function.Spec.Pod, - function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, - function.Spec.Pulsar.TLSSecret, function.Spec.Java, function.Spec.Python, function.Spec.Golang, function.Spec.Pod.Env, + function.Spec.Pulsar, downloadConfig, function.Spec.Java, function.Spec.Python, function.Spec.Golang, function.Spec.Pod.Env, function.Spec.LogTopic, function.Spec.FilebeatImage, function.Spec.LogTopicAgent, function.Spec.VolumeMounts, function.Spec.VolumeClaimTemplates, function.Spec.PersistentVolumeClaimRetentionPolicy) @@ -149,13 +149,14 @@ func MakeFunctionCleanUpJob(function *v1alpha1.Function) *v1.Job { } func makeFunctionVolumes(function *v1alpha1.Function, authConfig *v1alpha1.AuthConfig) []corev1.Volume { - return GeneratePodVolumes(function.Spec.Pod.Volumes, + volumes := GeneratePodVolumes(function.Spec.Pod.Volumes, function.Spec.Output.ProducerConf, function.Spec.Input.SourceSpecs, function.Spec.Pulsar.TLSConfig, authConfig, GetRuntimeLogConfigNames(function.Spec.Java, function.Spec.Python, function.Spec.Golang), function.Spec.LogTopicAgent) + return appendPackageServiceVolumes(volumes, function.Spec.PackageService) } func makeFunctionVolumeMounts(function *v1alpha1.Function, authConfig *v1alpha1.AuthConfig) []corev1.VolumeMount { @@ -176,10 +177,17 @@ func makeFunctionContainer(function *v1alpha1.Function) *corev1.Container { probe := MakeLivenessProbe(function.Spec.Pod.Liveness) allowPrivilegeEscalation := false mounts := makeFunctionVolumeMounts(function, function.Spec.Pulsar.AuthConfig) + mounts = appendPackageServiceVolumeMounts(mounts, function.Spec.PackageService) if utils.EnableInitContainers { mounts = append(mounts, generateDownloaderVolumeMountsForRuntime(function.Spec.Java, function.Spec.Python, function.Spec.Golang, function.Spec.GenericRuntime)...) } + envFrom := GenerateContainerEnvFrom(function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, + function.Spec.Pulsar.TLSSecret) + if function.Spec.PackageService != nil { + envFrom = append(envFrom, GenerateContainerEnvFromWithPrefix(function.Spec.PackageService.PulsarConfig, + function.Spec.PackageService.AuthSecret, function.Spec.PackageService.TLSSecret, PackageServiceEnvPrefix)...) + } return &corev1.Container{ // TODO new container to pull user code image and upload jars into bookkeeper Name: FunctionContainerName, @@ -189,10 +197,9 @@ func makeFunctionContainer(function *v1alpha1.Function) *corev1.Container { Env: generateContainerEnv(function), Resources: function.Spec.Resources, ImagePullPolicy: imagePullPolicy, - EnvFrom: GenerateContainerEnvFrom(function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, - function.Spec.Pulsar.TLSSecret), - VolumeMounts: mounts, - LivenessProbe: probe, + EnvFrom: envFrom, + VolumeMounts: mounts, + LivenessProbe: probe, SecurityContext: &corev1.SecurityContext{ Capabilities: &corev1.Capabilities{ Drop: []corev1.Capability{"ALL"}, @@ -222,6 +229,7 @@ func makeFunctionLabels(function *v1alpha1.Function) map[string]string { func makeFunctionCommand(function *v1alpha1.Function) []string { spec := function.Spec + downloadConfig := newDownloadServiceConfig(spec.PackageService, spec.Pulsar) connectorsDirectory := "" if spec.SourceConfig != nil || spec.SinkConfig != nil { @@ -254,7 +262,7 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { connectorsDirectory, string(function.UID), spec.Resources.Limits.Memory(), - spec.Java.JavaOpts, hasPulsarctl, hasWget, + spec.Java.JavaOpts, hasPulsarctl, hasWget, downloadConfig, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, spec.MaxPendingAsyncRequests, @@ -266,21 +274,21 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { return MakePythonFunctionCommand(spec.Python.PyLocation, mountPath, spec.Name, spec.ClusterName, generatePythonLogConfigCommand(spec.Name, spec.Python, spec.LogTopicAgent), - generateFunctionDetailsInJSON(function), string(function.UID), hasPulsarctl, hasWget, + generateFunctionDetailsInJSON(function), string(function.UID), hasPulsarctl, hasWget, downloadConfig, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig) } } else if spec.Golang != nil { if spec.Golang.Go != "" { mountPath := extractMountPath(spec.Golang.Go) - return MakeGoFunctionCommand(spec.Golang.GoLocation, mountPath, function) + return MakeGoFunctionCommand(spec.Golang.GoLocation, mountPath, function, downloadConfig) } } else if spec.GenericRuntime != nil { if spec.GenericRuntime.FunctionFile != "" { mountPath := extractMountPath(spec.GenericRuntime.FunctionFile) return MakeGenericFunctionCommand(spec.GenericRuntime.FunctionFileLocation, mountPath, spec.GenericRuntime.Language, spec.ClusterName, - generateFunctionDetailsInJSON(function), string(function.UID), + generateFunctionDetailsInJSON(function), string(function.UID), downloadConfig, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", function.Spec.SecretsMap, function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig) } diff --git a/controllers/spec/function_test.go b/controllers/spec/function_test.go index 0c14af74c..c881c8934 100644 --- a/controllers/spec/function_test.go +++ b/controllers/spec/function_test.go @@ -126,10 +126,10 @@ func TestInitContainerDownloader(t *testing.T) { function.Spec.ImagePullPolicy = runnerImagePullPolicy labels := makeFunctionLabels(function) + downloadConfig := newDownloadServiceConfig(function.Spec.PackageService, function.Spec.Pulsar) statefulSet := MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, makeFunctionContainer(function), makeFunctionVolumes(function, function.Spec.Pulsar.AuthConfig), labels, function.Spec.Pod, - function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, - function.Spec.Pulsar.TLSSecret, function.Spec.Java, function.Spec.Python, function.Spec.Golang, function.Spec.Pod.Env, + function.Spec.Pulsar, downloadConfig, function.Spec.Java, function.Spec.Python, function.Spec.Golang, function.Spec.Pod.Env, function.Spec.LogTopic, function.Spec.FilebeatImage, function.Spec.LogTopicAgent, function.Spec.VolumeMounts, function.Spec.VolumeClaimTemplates, function.Spec.PersistentVolumeClaimRetentionPolicy) @@ -231,3 +231,81 @@ func TestJavaFunctionCommandWithConnectorOverrides(t *testing.T) { producerConfig := sinkConfigsMap["producerConfigProperties"].(map[string]interface{}) assert.Equal(t, producerConfig["enable.idempotence"], true) } + +func TestFunctionPackageServiceDownloadCommandAndPodWiring(t *testing.T) { + previous := utils.EnableInitContainers + defer func() { + utils.EnableInitContainers = previous + }() + utils.EnableInitContainers = false + function := makeFunctionSamplePackageURL("package-service-test") + function.Spec.Pulsar.PulsarConfig = "runtime-pulsar" + function.Spec.Pulsar.AuthSecret = "runtime-auth" + function.Spec.Pulsar.TLSSecret = "runtime-tls" + function.Spec.PackageService = &v1alpha1.PulsarMessaging{ + PulsarConfig: "package-pulsar", + AuthSecret: "package-auth", + TLSSecret: "package-tls", + AuthConfig: &v1alpha1.AuthConfig{ + OAuth2Config: &v1alpha1.OAuth2Config{ + Audience: "package-audience", + IssuerURL: "https://issuer.example", + KeySecretName: "package-oauth", + KeySecretKey: "auth.json", + }, + }, + TLSConfig: &v1alpha1.PulsarTLSConfig{ + Enabled: true, + AllowInsecure: true, + HostnameVerification: false, + CertSecretName: "package-certs", + CertSecretKey: "ca.crt", + }, + } + + command := makeFunctionCommand(function) + assert.Assert(t, len(command) == 3, "commands should be 3 but got %d", len(command)) + assert.Assert(t, strings.Contains(command[2], "$PACKAGE_webServiceURL"), + "download command should use package service env, got %s", command[2]) + assert.Assert(t, !strings.Contains(command[2], "$webServiceURL packages download"), + "download command should not use runtime messaging env, got %s", command[2]) + + container := makeFunctionContainer(function) + assert.Assert(t, len(container.EnvFrom) > 0, "envFrom should not be empty") + + hasPackageEnvPrefix := false + for _, source := range container.EnvFrom { + if source.Prefix == PackageServiceEnvPrefix { + hasPackageEnvPrefix = true + } + } + assert.Assert(t, hasPackageEnvPrefix, "container should include package service envFrom with prefix") + + hasPackageTLSMount := false + hasPackageOAuthMount := false + for _, mount := range container.VolumeMounts { + if mount.MountPath == PackageTLSMountPath { + hasPackageTLSMount = true + } + if mount.MountPath == PackageOAuth2MountPath { + hasPackageOAuthMount = true + } + } + assert.Assert(t, hasPackageTLSMount, "container should include package service tls mount") + assert.Assert(t, hasPackageOAuthMount, "container should include package service oauth2 mount") +} + +func TestFunctionPackageServiceDownloadFallbackToMessaging(t *testing.T) { + previous := utils.EnableInitContainers + defer func() { + utils.EnableInitContainers = previous + }() + utils.EnableInitContainers = false + function := makeFunctionSamplePackageURL("package-service-fallback-test") + command := makeFunctionCommand(function) + assert.Assert(t, len(command) == 3, "commands should be 3 but got %d", len(command)) + assert.Assert(t, strings.Contains(command[2], "$webServiceURL"), + "download command should fallback to messaging env, got %s", command[2]) + assert.Assert(t, !strings.Contains(command[2], "$PACKAGE_webServiceURL"), + "download command should not use package service env without package service, got %s", command[2]) +} diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index 9dc736673..8c47bb557 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -62,9 +62,10 @@ func MakeSinkStatefulSet(ctx context.Context, cli client.Client, sink *v1alpha1. runnerImagePullPolicy := getSinkRunnerImagePullPolicy() sink.Spec.ImagePullPolicy = runnerImagePullPolicy + downloadConfig := newDownloadServiceConfig(sink.Spec.PackageService, sink.Spec.Pulsar) statefulSet := MakeStatefulSet(objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink), - makeSinkVolumes(sink, sink.Spec.Pulsar.AuthConfig), makeSinkLabels(sink), sink.Spec.Pod, sink.Spec.Pulsar.AuthConfig, - sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.AuthSecret, sink.Spec.Pulsar.TLSSecret, + makeSinkVolumes(sink, sink.Spec.Pulsar.AuthConfig), makeSinkLabels(sink), sink.Spec.Pod, sink.Spec.Pulsar, + downloadConfig, sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang, sink.Spec.Pod.Env, sink.Spec.LogTopic, sink.Spec.FilebeatImage, sink.Spec.LogTopicAgent, sink.Spec.VolumeMounts, nil, nil) @@ -106,9 +107,16 @@ func makeSinkContainer(sink *v1alpha1.Sink) *corev1.Container { probe := MakeLivenessProbe(sink.Spec.Pod.Liveness) allowPrivilegeEscalation := false mounts := makeSinkVolumeMounts(sink, sink.Spec.Pulsar.AuthConfig) + mounts = appendPackageServiceVolumeMounts(mounts, sink.Spec.PackageService) if utils.EnableInitContainers { mounts = append(mounts, generateDownloaderVolumeMountsForRuntime(sink.Spec.Java, nil, nil, nil)...) } + envFrom := GenerateContainerEnvFrom(sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.AuthSecret, + sink.Spec.Pulsar.TLSSecret) + if sink.Spec.PackageService != nil { + envFrom = append(envFrom, GenerateContainerEnvFromWithPrefix(sink.Spec.PackageService.PulsarConfig, + sink.Spec.PackageService.AuthSecret, sink.Spec.PackageService.TLSSecret, PackageServiceEnvPrefix)...) + } return &corev1.Container{ // TODO new container to pull user code image and upload jars into bookkeeper Name: SinkContainerName, @@ -118,10 +126,9 @@ func makeSinkContainer(sink *v1alpha1.Sink) *corev1.Container { Env: generateBasicContainerEnv(sink.Spec.SecretsMap, sink.Spec.Pod.Env), Resources: sink.Spec.Resources, ImagePullPolicy: imagePullPolicy, - EnvFrom: GenerateContainerEnvFrom(sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.AuthSecret, - sink.Spec.Pulsar.TLSSecret), - VolumeMounts: mounts, - LivenessProbe: probe, + EnvFrom: envFrom, + VolumeMounts: mounts, + LivenessProbe: probe, SecurityContext: &corev1.SecurityContext{ Capabilities: &corev1.Capabilities{ Drop: []corev1.Capability{"ALL"}, @@ -197,7 +204,7 @@ func MakeSinkCleanUpJob(sink *v1alpha1.Sink) *v1.Job { } func makeSinkVolumes(sink *v1alpha1.Sink, authConfig *v1alpha1.AuthConfig) []corev1.Volume { - return GeneratePodVolumes( + volumes := GeneratePodVolumes( sink.Spec.Pod.Volumes, nil, sink.Spec.Input.SourceSpecs, @@ -205,6 +212,7 @@ func makeSinkVolumes(sink *v1alpha1.Sink, authConfig *v1alpha1.AuthConfig) []cor authConfig, GetRuntimeLogConfigNames(sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang), sink.Spec.LogTopicAgent) + return appendPackageServiceVolumes(volumes, sink.Spec.PackageService) } func makeSinkVolumeMounts(sink *v1alpha1.Sink, authConfig *v1alpha1.AuthConfig) []corev1.VolumeMount { @@ -220,6 +228,7 @@ func makeSinkVolumeMounts(sink *v1alpha1.Sink, authConfig *v1alpha1.AuthConfig) func MakeSinkCommand(sink *v1alpha1.Sink) []string { spec := sink.Spec + downloadConfig := newDownloadServiceConfig(spec.PackageService, spec.Pulsar) hasPulsarctl := sink.Spec.ImageHasPulsarctl hasWget := sink.Spec.ImageHasWget if match, _ := regexp.MatchString(RunnerImageHasPulsarctl, sink.Spec.Image); match { @@ -244,7 +253,7 @@ func MakeSinkCommand(sink *v1alpha1.Sink) []string { "", string(sink.UID), spec.Resources.Limits.Memory(), - spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", + spec.Java.JavaOpts, hasPulsarctl, hasWget, downloadConfig, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, GenerateJavaLogConfigFileName(spec.Java), instancePath, entryClass) } diff --git a/controllers/spec/source.go b/controllers/spec/source.go index f756b084f..04137f7f1 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -62,9 +62,10 @@ func MakeSourceStatefulSet(ctx context.Context, cli client.Client, source *v1alp runnerImagePullPolicy := getSourceRunnerImagePullPolicy() source.Spec.ImagePullPolicy = runnerImagePullPolicy + downloadConfig := newDownloadServiceConfig(source.Spec.PackageService, source.Spec.Pulsar) statefulSet := MakeStatefulSet(objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source), - makeSourceVolumes(source, source.Spec.Pulsar.AuthConfig), makeSourceLabels(source), source.Spec.Pod, source.Spec.Pulsar.AuthConfig, - source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.AuthSecret, source.Spec.Pulsar.TLSSecret, + makeSourceVolumes(source, source.Spec.Pulsar.AuthConfig), makeSourceLabels(source), source.Spec.Pod, source.Spec.Pulsar, + downloadConfig, source.Spec.Java, source.Spec.Python, source.Spec.Golang, source.Spec.Pod.Env, source.Spec.LogTopic, source.Spec.FilebeatImage, source.Spec.LogTopicAgent, source.Spec.VolumeMounts, nil, nil) @@ -101,9 +102,16 @@ func makeSourceContainer(source *v1alpha1.Source) *corev1.Container { probe := MakeLivenessProbe(source.Spec.Pod.Liveness) allowPrivilegeEscalation := false mounts := makeSourceVolumeMounts(source, source.Spec.Pulsar.AuthConfig) + mounts = appendPackageServiceVolumeMounts(mounts, source.Spec.PackageService) if utils.EnableInitContainers { mounts = append(mounts, generateDownloaderVolumeMountsForRuntime(source.Spec.Java, nil, nil, nil)...) } + envFrom := GenerateContainerEnvFrom(source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.AuthSecret, + source.Spec.Pulsar.TLSSecret) + if source.Spec.PackageService != nil { + envFrom = append(envFrom, GenerateContainerEnvFromWithPrefix(source.Spec.PackageService.PulsarConfig, + source.Spec.PackageService.AuthSecret, source.Spec.PackageService.TLSSecret, PackageServiceEnvPrefix)...) + } return &corev1.Container{ // TODO new container to pull user code image and upload jars into bookkeeper Name: SourceContainerName, @@ -113,10 +121,9 @@ func makeSourceContainer(source *v1alpha1.Source) *corev1.Container { Env: generateBasicContainerEnv(source.Spec.SecretsMap, source.Spec.Pod.Env), Resources: source.Spec.Resources, ImagePullPolicy: imagePullPolicy, - EnvFrom: GenerateContainerEnvFrom(source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.AuthSecret, - source.Spec.Pulsar.TLSSecret), - VolumeMounts: mounts, - LivenessProbe: probe, + EnvFrom: envFrom, + VolumeMounts: mounts, + LivenessProbe: probe, SecurityContext: &corev1.SecurityContext{ Capabilities: &corev1.Capabilities{ Drop: []corev1.Capability{"ALL"}, @@ -143,7 +150,7 @@ func makeSourceLabels(source *v1alpha1.Source) map[string]string { } func makeSourceVolumes(source *v1alpha1.Source, authConfig *v1alpha1.AuthConfig) []corev1.Volume { - return GeneratePodVolumes( + volumes := GeneratePodVolumes( source.Spec.Pod.Volumes, source.Spec.Output.ProducerConf, nil, @@ -151,6 +158,7 @@ func makeSourceVolumes(source *v1alpha1.Source, authConfig *v1alpha1.AuthConfig) authConfig, GetRuntimeLogConfigNames(source.Spec.Java, source.Spec.Python, source.Spec.Golang), source.Spec.LogTopicAgent) + return appendPackageServiceVolumes(volumes, source.Spec.PackageService) } func makeSourceVolumeMounts(source *v1alpha1.Source, authConfig *v1alpha1.AuthConfig) []corev1.VolumeMount { @@ -166,6 +174,7 @@ func makeSourceVolumeMounts(source *v1alpha1.Source, authConfig *v1alpha1.AuthCo func makeSourceCommand(source *v1alpha1.Source) []string { spec := source.Spec + downloadConfig := newDownloadServiceConfig(spec.PackageService, spec.Pulsar) hasPulsarctl := source.Spec.ImageHasPulsarctl hasWget := source.Spec.ImageHasWget if match, _ := regexp.MatchString(RunnerImageHasPulsarctl, source.Spec.Image); match { @@ -191,7 +200,7 @@ func makeSourceCommand(source *v1alpha1.Source) []string { "", string(source.UID), spec.Resources.Limits.Memory(), - spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", + spec.Java.JavaOpts, hasPulsarctl, hasWget, downloadConfig, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, GenerateJavaLogConfigFileName(spec.Java), instancePath, entryClass) } diff --git a/docs/design/package-service-download-design.md b/docs/design/package-service-download-design.md new file mode 100644 index 000000000..c7a2cde90 --- /dev/null +++ b/docs/design/package-service-download-design.md @@ -0,0 +1,158 @@ +# Design: Separate Package Download Service (`PackageService`) + +## Summary + +This change adds optional `spec.packageService` support to `Function`, `Source`, and `Sink` so package download can use a dedicated Pulsar admin service configuration instead of reusing runtime `spec.pulsar` (`Messaging`). + +If `packageService` is set, package download uses it. +If `packageService` is not set, behavior remains unchanged and falls back to `spec.pulsar`. + +## Motivation + +Previously, package download and runtime messaging both depended on the same Pulsar config (`spec.pulsar`). +That made it hard to: + +- Use a different admin endpoint/auth for package retrieval +- Isolate package management credentials from runtime credentials +- Keep runtime broker settings stable while routing package download differently + +## Goals + +- Add an optional CRD field for dedicated package-download Pulsar config. +- Keep runtime messaging behavior unchanged. +- Preserve backward compatibility with fallback to `spec.pulsar`. +- Support both init-container download mode and in-container download mode. + +## Non-goals + +- No new Pulsar cluster deployment requirement. +- No change to cleanup subscription logic. +- No change to existing `spec.pulsar` required-field semantics. + +## API Changes + +Added to: + +- `FunctionSpec` +- `SourceSpec` +- `SinkSpec` + +Field: + +- `packageService *PulsarMessaging \`json:"packageService,omitempty"\`` + +Validation: + +- `packageService` is optional. +- When provided, `packageService.pulsarConfig` must be non-empty. +- Existing `spec.pulsar` validation remains required as before. + +## Controller/Spec Design + +### Service Selection + +A download service config is derived as: + +1. Use `spec.packageService` when non-nil. +2. Else use `spec.pulsar`. + +This selected config is used only for package download command/env/volume wiring. +Runtime messaging still uses `spec.pulsar`. + +### Command Generation + +`GetDownloadCommand` now has an env-aware path (`GetDownloadCommandWithEnv`) used by runtime and downloader init-container flows. + +- Supports `pulsarctl` and `pulsar-admin` download commands: + - `packages download ...` + - `functions download ...` +- Supports HTTP download path as before. + +### Env Isolation + +To avoid conflicts when both runtime and package env refs are present in the same container, `packageService` envs use prefixed env names: + +- Prefix: `PACKAGE_` +- Examples: `PACKAGE_webServiceURL`, `PACKAGE_clientAuthenticationParameters` + +Command generation for package download uses these prefixed env names. + +### Volumes and VolumeMounts + +When `packageService` is provided, required TLS/OAuth2 mounts for package download are added with dedicated mount paths: + +- OAuth2: `/etc/oauth2-package-service` +- TLS: `/etc/tls/pulsar-functions-package-service` + +This avoids mount-path collisions with runtime auth/TLS mounts. + +Volume/volumeMount de-dup helpers were added to avoid duplicate entries. + +### Init-container and Non-init-container Modes + +Both modes are supported: + +- **Init-container mode**: downloader init container uses selected download service env/auth/tls config. +- **Non-init-container mode**: workload container command prepends download command using selected download service and gets required env/mount injection. + +## Backward Compatibility + +- Existing manifests without `packageService` behave exactly as before. +- Existing required `spec.pulsar` contract remains unchanged. +- Existing command paths and runtime behavior are preserved unless `packageService` is set. + +## Testing + +### Unit Tests + +Updated/added tests in `controllers/spec` for: + +- New signatures and fallback behavior +- `packageService` download command env selection +- Runtime container package-service env/mount injection + +### Integration Tests + +Added new oauth2 integration case: + +- `.ci/tests/integration-oauth2/cases/java-download-function-with-package-service` + +Coverage: + +- Deploy function with both `pulsar` and `packageService`. +- Verify StatefulSet has package-service-specific env prefix and mounts. +- Verify end-to-end message processing succeeds. +- Reuses the same Pulsar cluster for package provider (no extra cluster). + +Test suites updated: + +- `.ci/tests/integration-oauth2/e2e.yaml` +- `.ci/tests/integration-oauth2/e2e_with_downloader.yaml` + +## Risks and Mitigations + +Risk: + +- Env or mount collisions when both runtime and package service configs exist. + +Mitigation: + +- Introduced `PACKAGE_` env prefix and dedicated mount paths. +- Added de-dup logic for volumes/mounts. + +Risk: + +- Nil-interface panics with TLS config checks. + +Mitigation: + +- Replaced direct reflection checks with a centralized nil-safe helper (`isNilTLSConfig`). + +## Operational Notes + +- Users can migrate incrementally by adding `packageService` only where needed. +- If `packageService` is removed, the system transparently falls back to `spec.pulsar`. + +## Conclusion + +This design decouples package download connectivity from runtime messaging while preserving existing behavior for current users. It improves credential isolation and deployment flexibility with minimal API surface expansion and strong backward compatibility. diff --git a/go.mod b/go.mod index cdf3380e5..ee976d333 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,17 @@ module github.com/streamnative/function-mesh -go 1.24.11 +go 1.24.13 require ( github.com/apache/pulsar-client-go v0.17.0 - github.com/go-logr/logr v1.4.2 + github.com/go-logr/logr v1.4.3 github.com/golang/protobuf v1.5.4 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.35.1 github.com/prometheus/client_golang v1.20.5 github.com/streamnative/function-mesh/api v0.0.0 github.com/streamnative/pulsarctl v0.6.0 - github.com/stretchr/testify v1.10.0 + github.com/stretchr/testify v1.11.1 google.golang.org/protobuf v1.36.6 gotest.tools v2.2.0+incompatible k8s.io/api v0.32.3 @@ -83,14 +83,14 @@ require ( github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.2.0 // indirect - go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect - go.opentelemetry.io/otel v1.35.0 // indirect + go.opentelemetry.io/otel v1.40.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // indirect - go.opentelemetry.io/otel/metric v1.35.0 // indirect - go.opentelemetry.io/otel/sdk v1.35.0 // indirect - go.opentelemetry.io/otel/trace v1.35.0 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/sdk v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.40.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect @@ -98,7 +98,7 @@ require ( golang.org/x/net v0.39.0 // indirect golang.org/x/oauth2 v0.29.0 // indirect golang.org/x/sync v0.13.0 // indirect - golang.org/x/sys v0.32.0 // indirect + golang.org/x/sys v0.40.0 // indirect golang.org/x/term v0.31.0 // indirect golang.org/x/text v0.24.0 // indirect golang.org/x/time v0.10.0 // indirect diff --git a/go.sum b/go.sum index 2af1e5d6c..d27cfc218 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-jose/go-jose/v4 v4.0.5 h1:M6T8+mKZl/+fNNuFHvGIzDz7BTLQPIounk/b9dw3AaE= github.com/go-jose/go-jose/v4 v4.0.5/go.mod h1:s3P1lRrkT8igV8D9OjyL4WRyHvjB6a4JSllnOrmmBOA= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= @@ -195,8 +195,8 @@ github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFS github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= -github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= -github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -221,28 +221,28 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= -go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= -go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 h1:qFffATk0X+HD+f1Z8lswGiOQYKHRlzfmdJm0wEaVrFA= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0/go.mod h1:MOiCmryaYtc+V0Ei+Tx9o5S1ZjA7kzLucuVuyzBZloQ= -go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= -go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= -go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= -go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= -go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= -go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= -go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= -go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -291,8 +291,8 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= -golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o= golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/images/pulsar-functions-python-runner/Dockerfile b/images/pulsar-functions-python-runner/Dockerfile index 9f9c8367a..1d8adf1f5 100644 --- a/images/pulsar-functions-python-runner/Dockerfile +++ b/images/pulsar-functions-python-runner/Dockerfile @@ -23,12 +23,10 @@ RUN if [ -d "/tmp/pulsar/cpp-client" ]; then mv /tmp/pulsar/cpp-client /pulsar/c # Install some utilities RUN apk update \ && apk add --no-cache python3 python3-dev tk-dev curl ca-certificates\ - && mv /usr/lib/python3.*/EXTERNALLY-MANAGED /tmp/EXTERNALLY-MANAGED.old - -RUN mkdir -p /etc/pki/tls/certs && cp /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt - -RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py -RUN python3 get-pip.py + && mkdir -p /etc/pki/tls/certs && cp /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt \ + && curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py \ + && mv /usr/lib/python3.*/EXTERNALLY-MANAGED /tmp/EXTERNALLY-MANAGED.old \ + && python3 get-pip.py && pip3 install --upgrade pip setuptools WORKDIR /pulsar diff --git a/images/pulsar-functions-python-runner/pulsarctl.Dockerfile b/images/pulsar-functions-python-runner/pulsarctl.Dockerfile index f63170e6c..cb6a1a2cc 100644 --- a/images/pulsar-functions-python-runner/pulsarctl.Dockerfile +++ b/images/pulsar-functions-python-runner/pulsarctl.Dockerfile @@ -51,7 +51,7 @@ RUN apk update \ && mkdir -p /etc/pki/tls/certs && cp /etc/ssl/certs/ca-certificates.crt /etc/pki/tls/certs/ca-bundle.crt \ && curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py \ && mv /usr/lib/python3.*/EXTERNALLY-MANAGED /tmp/EXTERNALLY-MANAGED.old \ - && python3 get-pip.py && pip3 install --upgrade pip + && python3 get-pip.py && pip3 install --upgrade pip setuptools RUN if [ -f "/pulsar/bin/install-pulsar-client-37.sh" ]; then /pulsar/bin/install-pulsar-client-37.sh || pip3 install 'pulsar-client[all]==3.5.0' ; else pip3 install 'pulsar-client[all]==3.5.0' ; fi RUN if [ -f "/pulsar/bin/install-pulsar-client.sh" ]; then /pulsar/bin/install-pulsar-client.sh || pip3 install 'pulsar-client[all]==3.5.0' ; else pip3 install 'pulsar-client[all]==3.5.0' ; fi diff --git a/images/samples/go-function-samples/Dockerfile b/images/samples/go-function-samples/Dockerfile index f11461291..945f5b42e 100644 --- a/images/samples/go-function-samples/Dockerfile +++ b/images/samples/go-function-samples/Dockerfile @@ -1,5 +1,5 @@ ARG PULSAR_IMAGE_TAG -FROM golang:1.24.11-trixie as builder +FROM golang:1.24.13-trixie as builder WORKDIR /workspace # Copy the Go Modules manifests diff --git a/images/samples/go-function-samples/func/go.mod b/images/samples/go-function-samples/func/go.mod index d76fd7063..9aace4ec8 100644 --- a/images/samples/go-function-samples/func/go.mod +++ b/images/samples/go-function-samples/func/go.mod @@ -1,6 +1,6 @@ module github.com/apache/pulsar/pulsar-function-go/examples -go 1.24.11 +go 1.24.13 require github.com/apache/pulsar/pulsar-function-go v0.0.0-20250430085326-611dc3f360b5 diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 18cc958d0..411c1f735 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -3430,8 +3430,12 @@ spec: type: object java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: @@ -3580,6 +3584,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object persistentVolumeClaimRetentionPolicy: properties: whenDeleted: @@ -7417,8 +7505,12 @@ spec: type: object java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: @@ -7500,6 +7592,90 @@ spec: negativeAckRedeliveryDelayMs: format: int32 type: integer + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: @@ -11016,8 +11192,12 @@ spec: type: string java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: @@ -11160,6 +11340,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: @@ -14825,8 +15089,12 @@ spec: type: object java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: @@ -14975,6 +15243,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object persistentVolumeClaimRetentionPolicy: properties: whenDeleted: @@ -18891,8 +19243,12 @@ spec: type: object java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: @@ -18974,6 +19330,90 @@ spec: negativeAckRedeliveryDelayMs: format: int32 type: integer + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: @@ -22569,8 +23009,12 @@ spec: type: string java: properties: + entryClass: + type: string extraDependenciesDir: type: string + instancePath: + type: string jar: type: string jarLocation: @@ -22713,6 +23157,90 @@ spec: typeClassName: type: string type: object + packageService: + properties: + authConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + authSecret: + type: string + cleanupAuthConfig: + properties: + genericAuth: + properties: + clientAuthenticationParameters: + type: string + clientAuthenticationPlugin: + type: string + required: + - clientAuthenticationParameters + - clientAuthenticationPlugin + type: object + oauth2Config: + properties: + audience: + type: string + issuerUrl: + type: string + keySecretKey: + type: string + keySecretName: + type: string + scope: + type: string + required: + - audience + - issuerUrl + - keySecretKey + - keySecretName + type: object + type: object + pulsarConfig: + type: string + tlsConfig: + properties: + allowInsecure: + type: boolean + certSecretKey: + type: string + certSecretName: + type: string + enabled: + type: boolean + hostnameVerification: + type: boolean + type: object + tlsSecret: + type: string + type: object pod: properties: affinity: diff --git a/pkg/webhook/function_webhook.go b/pkg/webhook/function_webhook.go index 2332251e5..efaf910ea 100644 --- a/pkg/webhook/function_webhook.go +++ b/pkg/webhook/function_webhook.go @@ -312,6 +312,10 @@ func (webhook *FunctionWebhook) ValidateCreate(ctx context.Context, obj runtime. if fieldErr != nil { allErrs = append(allErrs, fieldErr) } + fieldErr = validatePackageService(r.Spec.PackageService) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } fieldErr = validateBuiltinHPARules(r.Spec.Pod.BuiltinAutoscaler) if fieldErr != nil { diff --git a/pkg/webhook/sink_webhook.go b/pkg/webhook/sink_webhook.go index 1ba57f2c1..8a4a2b436 100644 --- a/pkg/webhook/sink_webhook.go +++ b/pkg/webhook/sink_webhook.go @@ -233,6 +233,10 @@ func (webhook *SinkWebhook) ValidateCreate(ctx context.Context, obj runtime.Obje if fieldErr != nil { allErrs = append(allErrs, fieldErr) } + fieldErr = validatePackageService(r.Spec.PackageService) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } if len(allErrs) == 0 { return nil, nil diff --git a/pkg/webhook/source_webhook.go b/pkg/webhook/source_webhook.go index bb8a44234..57418860d 100644 --- a/pkg/webhook/source_webhook.go +++ b/pkg/webhook/source_webhook.go @@ -223,6 +223,10 @@ func (webhook *SourceWebhook) ValidateCreate(ctx context.Context, obj runtime.Ob if fieldErr != nil { allErrs = append(allErrs, fieldErr) } + fieldErr = validatePackageService(r.Spec.PackageService) + if fieldErr != nil { + allErrs = append(allErrs, fieldErr) + } if len(allErrs) == 0 { return nil, nil diff --git a/pkg/webhook/validate.go b/pkg/webhook/validate.go index f5be491d2..3130c2376 100644 --- a/pkg/webhook/validate.go +++ b/pkg/webhook/validate.go @@ -404,6 +404,17 @@ func validateMessaging(messaging *v1alpha1.Messaging) *field.Error { return nil } +func validatePackageService(packageService *v1alpha1.PulsarMessaging) *field.Error { + if packageService == nil { + return nil + } + if packageService.PulsarConfig == "" { + return field.Invalid(field.NewPath("spec").Child("packageService"), packageService, + "packageService.pulsarConfig needs to be set") + } + return nil +} + func validateBuiltinHPARules(rules []v1alpha1.BuiltinHPARule) *field.Error { isCPURuleExists := false isMemoryRuleExists := false diff --git a/redhat.Dockerfile b/redhat.Dockerfile index 3222b9271..ad866708c 100644 --- a/redhat.Dockerfile +++ b/redhat.Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM golang:1.24.11-trixie as builder +FROM golang:1.24.13-trixie as builder WORKDIR /workspace/api COPY api/ .