diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md b/bigtop-bigpetstore/bigpetstore-spark/README.md index e550e48f31..9cd1bdaf33 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/README.md +++ b/bigtop-bigpetstore/bigpetstore-spark/README.md @@ -172,3 +172,48 @@ spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.analy ``` The resulting json file will contain lists of customers, products, and products recommended to each customer. + +Airflow Integration +-------------------------------------------- + +The steps described above are consolidated into [a single Airflow DAG](../../bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py). +You can try it as follows. The example here is tested on Debian 12, and assuming [Puppet](../../bigtop_toolchain/bin/puppetize.sh) and [Bigtop toolchain](../../bigtop_toolchain/README.md) are already installed and BPS Spark is built as a shadowed JAR in accordance with [the guide above](#building-and-running-with-spark). + +1. (Optional) build Airflow and Spark including their dependencies. + You can skip it if you use Bigtop's binary distribution packages on its public repository. + +``` +$ ./gradlew allclean airflow-pkg spark-pkg repo -Dbuildwithdeps=true +``` + +2. Deploy the packages above though Bigtop's Puppet manifests with appropriate parameters: + +``` +$ cat bigtop-deploy/puppet/hieradata/site.yaml +bigtop::bigtop_repo_gpg_check: false +bigtop::bigtop_repo_uri: [...] +bigtop::hadoop_head_node: ... +hadoop::hadoop_storage_dirs: [/data] +hadoop_cluster_node::cluster_components: [bigtop-utils, hdfs, yarn, spark, airflow] +airflow::server::install_bigpetstore_example: true # Enable the BigPetStore DAG +airflow::server::load_examples: false # Disable Airflow's default examples for simplicity +$ sudo cp -r bigtop-deploy/puppet/hiera* /etc/puppet +$ sudo puppet apply --hiera_config=/etc/puppet/hiera.yaml --modulepath=/vagrant_data/bigtop/bigtop-deploy/puppet/modules:/etc/puppet/code/modules:/usr/share/puppet/modules /vagrant_data/bigtop/bigtop-deploy/puppet/manifests +``` + +3. Create output directories on HDFS with the airflow owner: + +``` +$ sudo -u hdfs hdfs dfs -mkdir /user/airflow +$ sudo -u hdfs hdfs dfs -chown airflow:airflow /user/airflow +``` + +4. Login Airflow's web UI with admin/admin and wait for the DAG picked up for a while. + Once it's found, you can run it through the triangle button on the right. + ![dag_list](images/dag_list.png) + +5. Trigger the DAG with the path to the BPS Spark JAR. + ![trigger_dag](images/trigger_dag.png) + +6. If settings are appropriate, the BPS DAG should successfully run as follows: + ![running_dag](images/running_dag.png) diff --git a/bigtop-bigpetstore/bigpetstore-spark/build.gradle b/bigtop-bigpetstore/bigpetstore-spark/build.gradle index 970065b482..eae173eacc 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/build.gradle +++ b/bigtop-bigpetstore/bigpetstore-spark/build.gradle @@ -91,7 +91,7 @@ dependencies { compile("org.apache.spark:spark-core_${scalaVersion}:${sparkVersion}") compile("org.apache.spark:spark-mllib_${scalaVersion}:${sparkVersion}") compile("org.apache.spark:spark-sql_${scalaVersion}:${sparkVersion}") - compile "org.apache.bigtop:bigpetstore-data-generator:3.5.0-SNAPSHOT" + compile "org.apache.bigtop:bigpetstore-data-generator:3.6.0-SNAPSHOT" compile "org.json4s:json4s-jackson_${scalaVersion}:3.6.12" testCompile "junit:junit:4.13.2" diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png b/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png new file mode 100644 index 0000000000..3e18b441a1 Binary files /dev/null and b/bigtop-bigpetstore/bigpetstore-spark/images/dag_list.png differ diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png b/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png new file mode 100644 index 0000000000..b8dbde8a22 Binary files /dev/null and b/bigtop-bigpetstore/bigpetstore-spark/images/running_dag.png differ diff --git a/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png b/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png new file mode 100644 index 0000000000..9c18cb33ce Binary files /dev/null and b/bigtop-bigpetstore/bigpetstore-spark/images/trigger_dag.png differ diff --git a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml index 6d727b1dfa..69531fe662 100644 --- a/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml +++ b/bigtop-deploy/puppet/hieradata/bigtop/cluster.yaml @@ -217,3 +217,4 @@ ranger::admin::admin_password: "Admin01234" airflow::server::executor: "SequentialExecutor" airflow::server::load_examples: "True" airflow::server::sql_alchemy_conn: "sqlite:////var/lib/airflow/airflow.db" +airflow::server::install_bigpetstore_example: "False" diff --git a/bigtop-deploy/puppet/modules/airflow/manifests/init.pp b/bigtop-deploy/puppet/modules/airflow/manifests/init.pp index 1c26b3964a..0dd9cb15d3 100644 --- a/bigtop-deploy/puppet/modules/airflow/manifests/init.pp +++ b/bigtop-deploy/puppet/modules/airflow/manifests/init.pp @@ -20,7 +20,7 @@ } } - class server($executor, $load_examples, $sql_alchemy_conn) { + class server($executor, $load_examples, $sql_alchemy_conn, $install_bigpetstore_example=False) { package { 'airflow': ensure => latest, } @@ -55,5 +55,28 @@ ensure => running, require => Exec['airflow-db-init'], } + + if $install_bigpetstore_example { + exec { 'install-spark-provider': + command => "/usr/lib/airflow/bin/python3 -m pip install apache-airflow-providers-apache-spark 'pyspark<4'", + environment => ['AIRFLOW_HOME=/var/lib/airflow'], + user => 'root', + require => Package['airflow'], + } + + file { '/var/lib/airflow/dags': + ensure => 'directory', + owner => 'airflow', + group => 'airflow', + require => Package['airflow'], + } + + file { '/var/lib/airflow/dags/example_bigpetstore.py': + content => template('airflow/example_bigpetstore.py'), + owner => 'airflow', + group => 'airflow', + require => File['/var/lib/airflow/dags'], + } + } } } diff --git a/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py b/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py new file mode 100644 index 0000000000..1b8cb7f2a9 --- /dev/null +++ b/bigtop-deploy/puppet/modules/airflow/templates/example_bigpetstore.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pendulum + +from airflow.models import DAG +from airflow.models.param import Param +from airflow.operators.bash import BashOperator +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator + +with DAG( + "bigpetstore_dag", + params={"bigpetstore_jar_path": Param("bigpetstore-spark-3.6.0-all.jar")}, + schedule=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), +) as dag: + clean_hdfs_task = BashOperator( + bash_command="hdfs dfs -rm -f -r generated_data transformed_data", + task_id="clean_hdfs_task", + ) + + clean_fs_task = BashOperator( + bash_command="rm -f /tmp/PetStoreStats.json /tmp/recommendations.json", + task_id="clean_fs_task", + ) + + generate_task = SparkSubmitOperator( + application="{{ params.bigpetstore_jar_path }}", + application_args=["generated_data", "10", "1000", "365", "345"], + java_class="org.apache.bigtop.bigpetstore.spark.generator.SparkDriver", + task_id="generate_task" + ) + + transform_task = SparkSubmitOperator( + application="{{ params.bigpetstore_jar_path }}", + application_args=["generated_data", "transformed_data"], + java_class="org.apache.bigtop.bigpetstore.spark.etl.SparkETL", + task_id="transform_task" + ) + + analyze_task = SparkSubmitOperator( + application="{{ params.bigpetstore_jar_path }}", + application_args=["transformed_data", "/tmp/PetStoreStats.json"], + java_class="org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics", + task_id="analyze_task" + ) + + recommend_task = SparkSubmitOperator( + application="{{ params.bigpetstore_jar_path }}", + application_args=["transformed_data", "/tmp/recommendations.json"], + java_class="org.apache.bigtop.bigpetstore.spark.analytics.RecommendProducts", + task_id="recommend_task" + ) + + [clean_hdfs_task, clean_fs_task] >> generate_task >> transform_task >> [analyze_task, recommend_task] + +if __name__ == "__main__": + dag.test() diff --git a/bigtop-packages/src/common/airflow/airflow.default b/bigtop-packages/src/common/airflow/airflow.default index 3c73385b5f..7fb32a7ead 100644 --- a/bigtop-packages/src/common/airflow/airflow.default +++ b/bigtop-packages/src/common/airflow/airflow.default @@ -21,4 +21,5 @@ # # AIRFLOW_CONFIG= AIRFLOW_HOME=/var/lib/airflow +HADOOP_CONF_DIR=/etc/hadoop/conf PATH=/usr/lib/airflow/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin