Skip to content

Commit 4d70c7c

Browse files
committed
Add KubernetesPodOperator example to composer/workflows
1 parent 87ce0ff commit 4d70c7c

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# Copyright 2018 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""An example DAG demonstrating Kubernetes Pod Operator."""
16+
17+
from __future__ import print_function
18+
19+
import datetime
20+
21+
from airflow import models
22+
from airflow.contrib.kubernetes.pod import Resources
23+
from airflow.contrib.kubernetes.secret import Secret
24+
from airflow.contrib.kubernetes.volume import Volume
25+
from airflow.contrib.kubernetes.volume_mount import VolumeMount
26+
from airflow.contrib.operators import kubernetes_pod_operator
27+
28+
default_dag_args = {
29+
# Continue to run this DAG once per day
30+
'schedule_interval': datetime.timedelta(days=1),
31+
# Setting start date as yesterday starts the DAG immediately after discovery
32+
'start_date': datetime.datetime.now() - datetime.timedelta(days=1)
33+
}
34+
35+
# A Secret is an object that contains a small amount of sensitive data
36+
# such as a password, a token, or a key. Such information might otherwise be
37+
# put in a Pod specification or in an image; putting it in a Secret object
38+
# allows for more control over how it is used, and reduces the risk of
39+
# accidental exposure.
40+
secret_file = Secret(
41+
# Mounts the secret as a file in RAM-backed tmpfs
42+
deploy_type='volume',
43+
# File path of where to deploy the target, since 'volume'
44+
deploy_target='/etc/sql_conn',
45+
# Name of secret in Kubernetes
46+
secret='airflow-secrets',
47+
# Key of the secret within Kubernetes
48+
key='sql_alchemy_conn'
49+
)
50+
51+
secret_env = Secret(
52+
# Exposes secret as environment variable
53+
deploy_type='env',
54+
# The environment variable the secret is specified under
55+
deploy_target='SQL_CONN',
56+
# Name of secret in Kubernetes
57+
secret='airflow-secrets',
58+
# Key of the secret within Kubernetes
59+
key='sql_alchemy_conn'
60+
)
61+
62+
resources_obj = Resources(
63+
# Amount of memory requested, can use E, P, T, G, M, K, Ei, Pi,
64+
# Ti, Gi, Mi, Ki as suffixes
65+
request_memory='200Mi',
66+
# Tells container to attempt to use specified number of cpus. One 'cpu' is
67+
# equivalent to 1 AWS vCPU, 1 GCP Core, 1 Azure vCore or 1 Hyperthread on
68+
# a bare-metal intel processor with Hyperthreading
69+
# If CPU request exceeds all of your node's capacities it will fail to ever
70+
# get scheduled.
71+
request_cpu='2',
72+
# If memory limit is exceeded the Pod goes up for termination, if no
73+
# limit is specified there is no upper bound on the amount of memory it can
74+
# use. You can also specify a default memory limit on a per-namespace basis
75+
limit_memory='100Mi',
76+
# If cpu request exceeds your node's capacity, it will fail to ever get
77+
# scheduled. The m suffix stands for milli-cpus, therefore .5 cpu and 500m
78+
# cpu are equivalent.
79+
limit_cpu='500m'
80+
)
81+
82+
# Creates a volume of type emptyDir without any configs
83+
volumes = [Volume(name='empty-vol', configs={'emptyDir', {}})]
84+
85+
# Used to mount pod level volumes to a running container
86+
volume_mounts = [VolumeMount(name='test-vol-mount',
87+
mount_path='/root/mount_file',
88+
sub_path='None',
89+
read_only=False)
90+
]
91+
92+
# Any task you create within the context manager is automatically added to the
93+
# DAG object.
94+
with models.DAG(dag_id='kubernetes-example',
95+
default_args=default_dag_args) as dag:
96+
# Only name, namespace, and image are required to create a
97+
# KubernetesPodOperator. This operator defaults to using the config file found
98+
# at `~/.kube/config` if no `config_file` parameter is specified.
99+
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
100+
# The ID specified for the task
101+
task_id='pi_task_full',
102+
# Name of task you want to run, used to generate pod id
103+
name='pi',
104+
# The namespace to run within Kubernetes
105+
# To reduce resource starvation for airflow workers and scheduler in
106+
# Cloud Composer, be sure to have namespace point to a custom node pool
107+
namespace='default',
108+
# Docker image specified. Defaults to hub.docker.com, but any fully
109+
# qualified URLs will point to a custom repository
110+
image='ubuntu',
111+
)
112+
113+
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
114+
task_id='pi_task_full',
115+
name='pi',
116+
namespace='default',
117+
image='perl',
118+
# Entrypoint of the container, if not specified the Docker container's
119+
# entrypoint is used.
120+
cmds=['perl'],
121+
# Arguments to the entrypoint. The docker image's CMD is used if this is
122+
# not provided
123+
arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
124+
# The secrets to pass to Pod
125+
secrets=[secret_env, secret_file],
126+
# Labels to apply to the Pod
127+
labels={'pod-label': 'label-name'},
128+
# Timeout to start up the pod, default is 120
129+
startup_timeout_seconds=120,
130+
# The environment variables to be initialized in the container
131+
env_vars={'EXAMPLE_VAR': '/example/value'},
132+
# If true, logs stdout output of container
133+
get_logs=True,
134+
# Determines when to pull a fresh image, if 'IfNotPresent' will cause the
135+
# Kubelet to skip pulling an image if it already exists. If you want to
136+
# always pull a new image, set it to 'Always'
137+
image_pull_policy='IfNotPresent',
138+
# Annotations are non-identifying metadata you can attach to the Pod
139+
# Can be a large range of data, and can include characters
140+
# that are not permitted by labels
141+
annotations={'key1': 'value1'},
142+
# Resource specifications for Pod, of type pod.Resource
143+
resources=resources_obj,
144+
# Specifies path to kubernetes config. If no config is specified will
145+
# default to '~/.kube/config'
146+
config_file='~/.kube/config',
147+
# If true, the content of /airflow/xcom/return.json from container will
148+
# also be pushed to an XCom when the container ends
149+
xcom_push=True,
150+
# List of volumes to pass to the Pod
151+
volumes=volumes,
152+
# list of volume mounts to pass to the Pod
153+
volume_mounts=volume_mounts,
154+
# Affinity determines which nodes the Pod can run on based on the config
155+
# For more information see
156+
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
157+
affinity={}
158+
)

0 commit comments

Comments
 (0)