Skip to content

kurtzace/airflow-experiments

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

20 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Airflow 2.0

Guide

instructions to run

first time

  • ensure Install Docker Compose v1.29.1 and newer on your workstation.

in ubuntu

sudo apt-get update
sudo apt-get install docker-compose-plugin
  • echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
  • sudo docker compose up airflow-init

every time

  • sudo docker compose up

  • signin page - type airflow and airflow

  • view dags 2.0 view image

  • run 1airflow_experiments whose graph is image

see rabbitmq message image

2nd dag is dynamically generated from json

docker/airflow/fileinput/input_dag.json

3rd one is human_in_loop

image

replace myip with your ip in 3human....py by running

#in ubuntu
hostname -I

ensure you run both docker composes

#using docker-compose version 1.27.4,
sudo docker-compose -f docker-compose19.yml build
sudo docker-compose -f docker-compose19.yml  up
sudo docker compose  up

....19.yml starts API and mongo normal ynl starts airflow and rabbit

after it is triggered.

you will see that pending runs infinitely

it is waiting for human

so copy the jobid from xcom tab of mongo_task

image

then call api by either visiting http://localhost:60001/docs

or

call

curl -X 'GET' \
  'http://localhost:60001/sethumaninput/22-0530-1314?status=false' \
  -H 'accept: application/json'

depending on which (status) accurate or inaccurate flow is called

Rest call attempt

ENDPOINT_URL="http://localhost:8080/"
curl -X GET  \
    --user "airflow:airflow" \
    "${ENDPOINT_URL}/api/v1/pools"

4th human non polling

Run the 4th dag

visit graph and get details as highlighted

image

call the following endpoint and mark as success (i.e emulated human action)

ENDPOINT_URL="http://localhost:8080/"
curl -X POST  \
    --user "airflow:airflow" \
    -H 'Content-Type: application/json' \
    "${ENDPOINT_URL}api/v1/dags/4human_non_polling/updateTaskInstancesState" \
    -d "{ \"dry_run\": false, \"task_id\": \"manual_sign_off\", \
    \"include_upstream\":  false, \"include_downstream\":  false, \"include_future\":  false, \"include_past\":  false, \
  \"dag_run_id\": \"manual__2022-05-31T13:01:13.894084+00:00\", \
  \"new_state\": \"success\" }"

explanation of above api

then the next step will execute.

concept is derived from link

5th mid sized pipeline

  • add aws conn string like this
  • image, ref
  • I created a file s3://git-experiments/airflow-exp-p5-file-list.json with contents
["plate_mh_p1.jpg", "rec_part2_p2.jpg", "survey_like_p2.png", "rec_appliance_p2.png", "rec_walmart_p1.jpg", "tax_us_p1.png"]

graphiccal way via airflow 2.x issues

BPM type workflow?

  859  git clone https://github.com/sartography/spiff-example-cli
  860  cd spiff-example-cli/
  861  sudo docker build . -t spiff_example_cli
  862  docker run -it --rm spiff_example_cli

Things to try:

  • make json work
  • wait for human action though ui and then proceed with next task
  • Generate dag using drawing
  • drawing rules (priority codes, processs docments as per priority)
  • parallel flow and merge (one that waits for manual)

approve reject alternate flow

  • queue - jira or library

clean up

docker compose down --volumes --rmi all

Airflow 1.9

using docker-compose version 1.27.4,

Guides

Getting started project

git productionalizing-data-pipelines-airflow

instructions to run the project

sudo docker-compose -f docker-compose19.yml build
sudo docker-compose -f docker-compose19.yml  up

trigger run

first run

  • Hello world
  • with hello_world.py
  • docker-compose up
  • view UI image
  • Follow steps 1-6 to trigger the run and view logs image

file http mongo

  • drop files into ./docker/airflow/fileinput folder to see fresh 30seconds polled based filesensor
  • Create http connection (optional - if not done - right now python code is emulating it via airflow add connection command) Admin>connections>add new and enter params like this image
  • after starting http_rabbit dag
  • to test mongo insert run
sudo docker exec -it airflow-experiments_mongo_1 sh
# mongosh
test> show collections
postcollection
test> db.postcollection.count()
test> db.postcollection.find({}).take(1)
  • graph view image
  • tree view image
  • dag view image

API

http://localhost:8080/api/experimental/latest_runs
  • trigger dag run using api
curl -X POST \
  http://localhost:8080/api/experimental/dags/hello_world/dag_runs \
  -H 'Cache-Control: no-cache' \
  -H 'Content-Type: application/json' \
  -d '{"conf":"{\"key\":\"value\"}"}'

Rabbitmq operator issue

because for a few plugins - example rabbitmq - we run into operator incomptibility

Appendix

providers

How to

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published