|
| 1 | +# Copyright 2018 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# https://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +"""An example DAG demonstrating Kubernetes Pod Operator.""" |
| 16 | + |
| 17 | +from __future__ import print_function |
| 18 | + |
| 19 | +import datetime |
| 20 | + |
| 21 | +from airflow import models |
| 22 | +from airflow.contrib.kubernetes.pod import Resources |
| 23 | +from airflow.contrib.kubernetes.secret import Secret |
| 24 | +from airflow.contrib.kubernetes.volume import Volume |
| 25 | +from airflow.contrib.kubernetes.volume_mount import VolumeMount |
| 26 | +from airflow.contrib.operators import kubernetes_pod_operator |
| 27 | + |
| 28 | +default_dag_args = { |
| 29 | + # Continue to run this DAG once per day |
| 30 | + 'schedule_interval': datetime.timedelta(days=1), |
| 31 | + # Setting start date as yesterday starts the DAG immediately after discovery |
| 32 | + 'start_date': datetime.datetime.now() - datetime.timedelta(days=1) |
| 33 | +} |
| 34 | + |
| 35 | +# A Secret is an object that contains a small amount of sensitive data |
| 36 | +# such as a password, a token, or a key. Such information might otherwise be |
| 37 | +# put in a Pod specification or in an image; putting it in a Secret object |
| 38 | +# allows for more control over how it is used, and reduces the risk of |
| 39 | +# accidental exposure. |
| 40 | +secret_file = Secret( |
| 41 | + # Mounts the secret as a file in RAM-backed tmpfs |
| 42 | + deploy_type='volume', |
| 43 | + # File path of where to deploy the target, since 'volume' |
| 44 | + deploy_target='/etc/sql_conn', |
| 45 | + # Name of secret in Kubernetes |
| 46 | + secret='airflow-secrets', |
| 47 | + # Key of the secret within Kubernetes |
| 48 | + key='sql_alchemy_conn' |
| 49 | +) |
| 50 | + |
| 51 | +secret_env = Secret( |
| 52 | + # Exposes secret as environment variable |
| 53 | + deploy_type='env', |
| 54 | + # The environment variable the secret is specified under |
| 55 | + deploy_target='SQL_CONN', |
| 56 | + # Name of secret in Kubernetes |
| 57 | + secret='airflow-secrets', |
| 58 | + # Key of the secret within Kubernetes |
| 59 | + key='sql_alchemy_conn' |
| 60 | +) |
| 61 | + |
| 62 | +resources_obj = Resources( |
| 63 | + # Amount of memory requested, can use E, P, T, G, M, K, Ei, Pi, |
| 64 | + # Ti, Gi, Mi, Ki as suffixes |
| 65 | + request_memory='200Mi', |
| 66 | + # Tells container to attempt to use specified number of cpus. One 'cpu' is |
| 67 | + # equivalent to 1 AWS vCPU, 1 GCP Core, 1 Azure vCore or 1 Hyperthread on |
| 68 | + # a bare-metal intel processor with Hyperthreading |
| 69 | + # If CPU request exceeds all of your node's capacities it will fail to ever |
| 70 | + # get scheduled. |
| 71 | + request_cpu='2', |
| 72 | + # If memory limit is exceeded the Pod goes up for termination, if no |
| 73 | + # limit is specified there is no upper bound on the amount of memory it can |
| 74 | + # use. You can also specify a default memory limit on a per-namespace basis |
| 75 | + limit_memory='100Mi', |
| 76 | + # If cpu request exceeds your node's capacity, it will fail to ever get |
| 77 | + # scheduled. The m suffix stands for milli-cpus, therefore .5 cpu and 500m |
| 78 | + # cpu are equivalent. |
| 79 | + limit_cpu='500m' |
| 80 | +) |
| 81 | + |
| 82 | +# Creates a volume of type emptyDir without any configs |
| 83 | +volumes = [Volume(name='empty-vol', configs={'emptyDir', {}})] |
| 84 | + |
| 85 | +# Used to mount pod level volumes to a running container |
| 86 | +volume_mounts = [VolumeMount(name='test-vol-mount', |
| 87 | + mount_path='/root/mount_file', |
| 88 | + sub_path='None', |
| 89 | + read_only=False) |
| 90 | + ] |
| 91 | + |
| 92 | +# Any task you create within the context manager is automatically added to the |
| 93 | +# DAG object. |
| 94 | +with models.DAG(dag_id='kubernetes-example', |
| 95 | + default_args=default_dag_args) as dag: |
| 96 | + # Only name, namespace, and image are required to create a |
| 97 | + # KubernetesPodOperator. This operator defaults to using the config file found |
| 98 | + # at `~/.kube/config` if no `config_file` parameter is specified. |
| 99 | + kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator( |
| 100 | + # The ID specified for the task |
| 101 | + task_id='pi_task_full', |
| 102 | + # Name of task you want to run, used to generate pod id |
| 103 | + name='pi', |
| 104 | + # The namespace to run within Kubernetes |
| 105 | + # To reduce resource starvation for airflow workers and scheduler in |
| 106 | + # Cloud Composer, be sure to have namespace point to a custom node pool |
| 107 | + namespace='default', |
| 108 | + # Docker image specified. Defaults to hub.docker.com, but any fully |
| 109 | + # qualified URLs will point to a custom repository |
| 110 | + image='ubuntu', |
| 111 | + ) |
| 112 | + |
| 113 | + kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator( |
| 114 | + task_id='pi_task_full', |
| 115 | + name='pi', |
| 116 | + namespace='default', |
| 117 | + image='perl', |
| 118 | + # Entrypoint of the container, if not specified the Docker container's |
| 119 | + # entrypoint is used. |
| 120 | + cmds=['perl'], |
| 121 | + # Arguments to the entrypoint. The docker image's CMD is used if this is |
| 122 | + # not provided |
| 123 | + arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'], |
| 124 | + # The secrets to pass to Pod |
| 125 | + secrets=[secret_env, secret_file], |
| 126 | + # Labels to apply to the Pod |
| 127 | + labels={'pod-label': 'label-name'}, |
| 128 | + # Timeout to start up the pod, default is 120 |
| 129 | + startup_timeout_seconds=120, |
| 130 | + # The environment variables to be initialized in the container |
| 131 | + env_vars={'EXAMPLE_VAR': '/example/value'}, |
| 132 | + # If true, logs stdout output of container |
| 133 | + get_logs=True, |
| 134 | + # Determines when to pull a fresh image, if 'IfNotPresent' will cause the |
| 135 | + # Kubelet to skip pulling an image if it already exists. If you want to |
| 136 | + # always pull a new image, set it to 'Always' |
| 137 | + image_pull_policy='IfNotPresent', |
| 138 | + # Annotations are non-identifying metadata you can attach to the Pod |
| 139 | + # Can be a large range of data, and can include characters |
| 140 | + # that are not permitted by labels |
| 141 | + annotations={'key1': 'value1'}, |
| 142 | + # Resource specifications for Pod, of type pod.Resource |
| 143 | + resources=resources_obj, |
| 144 | + # Specifies path to kubernetes config. If no config is specified will |
| 145 | + # default to '~/.kube/config' |
| 146 | + config_file='~/.kube/config', |
| 147 | + # If true, the content of /airflow/xcom/return.json from container will |
| 148 | + # also be pushed to an XCom when the container ends |
| 149 | + xcom_push=True, |
| 150 | + # List of volumes to pass to the Pod |
| 151 | + volumes=volumes, |
| 152 | + # list of volume mounts to pass to the Pod |
| 153 | + volume_mounts=volume_mounts, |
| 154 | + # Affinity determines which nodes the Pod can run on based on the config |
| 155 | + # For more information see |
| 156 | + # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/ |
| 157 | + affinity={} |
| 158 | + ) |
0 commit comments