Best Python code snippet using localstack_python
example_dataproc.py
Source:example_dataproc.py  
1# -*- coding: utf-8 -*-2#3# Licensed to the Apache Software Foundation (ASF) under one4# or more contributor license agreements.  See the NOTICE file5# distributed with this work for additional information6# regarding copyright ownership.  The ASF licenses this file7# to you under the Apache License, Version 2.0 (the8# "License"); you may not use this file except in compliance9# with the License.  You may obtain a copy of the License at10#11#   http://www.apache.org/licenses/LICENSE-2.012#13# Unless required by applicable law or agreed to in writing,14# software distributed under the License is distributed on an15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY16# KIND, either express or implied.  See the License for the17# specific language governing permissions and limitations18# under the License.19"""20Example Airflow DAG that show how to use various Dataproc21operators to manage a cluster and submit jobs.22"""23import os24import airflow25from airflow import models26from airflow.providers.google.cloud.operators.dataproc import (27    DataprocClusterCreateOperator, DataprocClusterDeleteOperator, DataprocSubmitJobOperator,28    DataprocUpdateClusterOperator,29)30PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")31CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-project")32REGION = os.environ.get("GCP_LOCATION", "europe-west1")33ZONE = os.environ.get("GCP_REGION", "europe-west-1b")34BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")35OUTPUT_FOLDER = "wordcount"36OUTPUT_PATH = "gs://{}/{}/".format(BUCKET, OUTPUT_FOLDER)37PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py")38PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN)39# Cluster definition40CLUSTER = {41    "project_id": PROJECT_ID,42    "cluster_name": CLUSTER_NAME,43    "config": {44        "master_config": {45            "num_instances": 1,46            "machine_type_uri": "n1-standard-4",47            "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},48        },49        "worker_config": {50            "num_instances": 2,51            "machine_type_uri": "n1-standard-4",52            "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},53        },54    },55}56# Update options57CLUSTER_UPDATE = {58    "config": {59        "worker_config": {"num_instances": 3},60        "secondary_worker_config": {"num_instances": 3},61    }62}63UPDATE_MASK = {64    "paths": [65        "config.worker_config.num_instances",66        "config.secondary_worker_config.num_instances",67    ]68}69TIMEOUT = {"seconds": 1 * 24 * 60 * 60}70# Jobs definitions71PIG_JOB = {72    "reference": {"project_id": PROJECT_ID},73    "placement": {"cluster_name": CLUSTER_NAME},74    "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},75}76SPARK_SQL_JOB = {77    "reference": {"project_id": PROJECT_ID},78    "placement": {"cluster_name": CLUSTER_NAME},79    "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},80}81SPARK_JOB = {82    "reference": {"project_id": PROJECT_ID},83    "placement": {"cluster_name": CLUSTER_NAME},84    "spark_job": {85        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],86        "main_class": "org.apache.spark.examples.SparkPi",87    },88}89PYSPARK_JOB = {90    "reference": {"project_id": PROJECT_ID},91    "placement": {"cluster_name": CLUSTER_NAME},92    "pyspark_job": {"main_python_file_uri": PYSPARK_URI},93}94HIVE_JOB = {95    "reference": {"project_id": PROJECT_ID},96    "placement": {"cluster_name": CLUSTER_NAME},97    "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},98}99HADOOP_JOB = {100    "reference": {"project_id": PROJECT_ID},101    "placement": {"cluster_name": CLUSTER_NAME},102    "hadoop_job": {103        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",104        "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],105    },106}107with models.DAG(108    "example_gcp_dataproc",109    default_args={"start_date": airflow.utils.dates.days_ago(1)},110    schedule_interval=None,111) as dag:112    create_cluster = DataprocClusterCreateOperator(113        task_id="create_cluster", project_id=PROJECT_ID, cluster=CLUSTER, region=REGION114    )115    scale_cluster = DataprocUpdateClusterOperator(116        task_id="scale_cluster",117        cluster_name=CLUSTER_NAME,118        cluster=CLUSTER_UPDATE,119        update_mask=UPDATE_MASK,120        graceful_decommission_timeout=TIMEOUT,121        project_id=PROJECT_ID,122        location=REGION,123    )124    pig_task = DataprocSubmitJobOperator(125        task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID126    )127    spark_sql_task = DataprocSubmitJobOperator(128        task_id="spark_sql_task",129        job=SPARK_SQL_JOB,130        location=REGION,131        project_id=PROJECT_ID,132    )133    spark_task = DataprocSubmitJobOperator(134        task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID135    )136    pyspark_task = DataprocSubmitJobOperator(137        task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID138    )139    hive_task = DataprocSubmitJobOperator(140        task_id="hive_task", job=HIVE_JOB, location=REGION, project_id=PROJECT_ID141    )142    hadoop_task = DataprocSubmitJobOperator(143        task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID144    )145    delete_cluster = DataprocClusterDeleteOperator(146        task_id="delete_cluster",147        project_id=PROJECT_ID,148        cluster_name=CLUSTER_NAME,149        region=REGION,150    )151    create_cluster >> scale_cluster152    scale_cluster >> hive_task >> delete_cluster153    scale_cluster >> pig_task >> delete_cluster154    scale_cluster >> spark_sql_task >> delete_cluster155    scale_cluster >> spark_task >> delete_cluster156    scale_cluster >> pyspark_task >> delete_cluster...airflow_data_proc.py
Source:airflow_data_proc.py  
1import os2from airflow import DAG3from airflow.providers.google.cloud.operators.dataproc import (4    DataprocCreateClusterOperator,5    DataprocDeleteClusterOperator,6    DataprocSubmitJobOperator,7)8from airflow.utils.dates import days_ago9PROJECT_ID = os.environ.get('GCP_PROJECT')10REGION = 'us-central1'11CLUSTER_NAME = 'ephemeral-spark-cluster-{{ ds_nodash }}'12PYSPARK_URI = 'gs://packt-data-eng-on-gcp-data-bucket/chapter-5/code/pyspark_gcs_to_bq.py'13PYSPARK_JOB = {14    "reference": {"project_id": PROJECT_ID},15    "placement": {"cluster_name": CLUSTER_NAME},16    "pyspark_job": {"main_python_file_uri": PYSPARK_URI,17    "jar_file_uris":["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"]18    }19}20cluster_config_json = {21    "worker_config": {22      "num_instances": 223    }24}25args = {26    'owner': 'packt-developer',27}28with DAG(29    dag_id='dataproc_ephemeral_cluster_job',30    schedule_interval='0 5 * * *',31    start_date=days_ago(1),32    default_args=args33) as dag:34    create_cluster = DataprocCreateClusterOperator(35        task_id="create_cluster",36        project_id=PROJECT_ID,37        cluster_config=cluster_config_json,38        region=REGION,39        cluster_name=CLUSTER_NAME,40        idle_delete_ttl=60041    )42    pyspark_task = DataprocSubmitJobOperator(43        task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID44    )45    delete_cluster = DataprocDeleteClusterOperator(46        task_id="delete_cluster", project_id=PROJECT_ID, cluster_name=CLUSTER_NAME, region=REGION47    )...delete_cluster.py
Source:delete_cluster.py  
...3import logging4# CONFIG5config = configparser.ConfigParser()6config.read('dwh.cfg')7def delete_cluster():8    """9    Deletes Redshift cluster.10    """11    try:12        redshift = boto3.client('redshift',13                                config['REGION']['AWS_REGION']14        )15        redshift.delete_cluster(16            ClusterIdentifier=config['CLUSTER']['CLUSTER_ID'], 17            SkipFinalClusterSnapshot=True18        )19    except Exception as e:20        logging.error(e)21if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
