Dino Fizzotti

Engineer, maker, hacker, thinker, funner.

Oct 14, 2018 - 6 minute read - Comments - Software Python

CarbAlert - Part 3: Celery, Mailgun and Flower

This is part 3 of a 4 part series of articles where I explain how I discovered and purchased my laptop by building a web application which scrapes a local PC parts forum and sends automated email alerts when posts featuring specific keywords appear:

CarbAlert on GitHub: https://github.com/dinofizz/carbalert

Celery

Celery is a distributed task queue framework. In conjunction with a message broker (in my case Redis) it can be used to process asynchronous tasks as well as schedule periodic tasks. I am using both of these features:

  • A periodic task is run every 5 minutes to initiate the Scrapy CarbSpider to scrape and scan the first page of the Carbonite Laptop forum index page for new threads featuring search phrases of interest.
  • From within the CarbPipeline activity I push asynchronous email tasks for Celery to handle. This separates the sending of my email notifications from the parsing of the thread metadata.

My Celery tasks are defined in a tasks.py file within my CarbAlert project. I must admit that I did battle quite a bit with my Celery configuration. Import statements and task recognition depend on the location from which the Celery process is invoked (I think). I also need to reference my Django models in a Scrapy pipeline which is kicked off from a Celery task, and I definitely got a bit confused trying to get everything to play nicely together. Honestly this was one of those situations where I tried many different things, and once I had it working I kind of left it at that.

From what I can recall my specific problems were:

  • Import statements for Django models in the CarbPipeline.py file (which is a Scrapy process initiated by a Celery task).
  • Using the correct/similar/same decorators for the actual Celery tasks. I recall trying to understand why my tasks were not being recognised at Celery start-up.

I do want to return to Celery and try resolve all the things I didn’t understand. Next project.

The code for my tasks can be found below. The scrape_carbonite task is run every 5 minutes and simply kicks off a CrawlerProcess with my CarbSpider. The send_email_notification tasks is initiated from within the CarbPipeline. I am using the Requests library to send an HTTP request to the Mailgun API which in turn will send an email to the user associated with the search phrase hits for a forum post. More on Mailgun later.

app.conf.beat_schedule = {
    "scrape-every-300-seconds": {
        "task": "carbalert.carbalert_scrapy.carbalert_scrapy.tasks.scrape_carbonite",
        "schedule": 300.0,
    }
}

@app.task
def scrape_carbonite():
    process = CrawlerProcess(settings=get_project_settings())
    process.crawl(CarbSpider)
    process.start()


@shared_task(base=MailgunAPITask, bind=True)
def send_email_notification(
    self, email_address, phrases, title, text, thread_url, thread_datetime
):
    logger.info(f"Received alert for {email_address} for thread title: {title}")
    subject = f"CARBALERT: {title}"

    phrase_list = ""

    for phrase in phrases:
        phrase_list += f"{phrase}\n"

    text = f"{phrase_list}\n{thread_datetime}\n\n{title}\n\n{text}\n\n{thread_url}\n\nEND\n"

    mailgun_url = f"https://api.mailgun.net/v3/{self.mailgun_domain}/messages"
    mailgun_from = f"CarbAlert <{self.mailgun_email}>"

    try:
        logger.info(f"Sending mail to {email_address}")
        response = requests.post(
            mailgun_url,
            auth=("api", self.mailgun_api_key),
            data={
                "from": mailgun_from,
                "to": [email_address],
                "subject": subject,
                "text": text,
            },
        )
        if response.status_code is not 200:
            logger.error(
                f"Unexpected error code received on Mailgun response for email to {email_address}. "
                f"Code: {response.status_code}, Raw {response.raw}"
            )
            response.raise_for_status()
    except Exception as ex:
        logger.error(f"Error sending mail to {email_address}: {ex}")
        raise ex

Link to code: https://github.com/dinofizz/carbalert/blob/master/carbalert/carbalert_scrapy/carbalert_scrapy/tasks.py

My docker-compose.yml files specifies the command line arguments for the celery worker and celery beat processes, as shown below.

Mailgun

I decided that I wanted to try out a transactional email API for my email notification feature for this CarbAlert project. Mailgun offers an easy to use API with a pay-as-you-go tier that allows for up to 10000 emails to be sent every month for free. As this was a personal project where I had at most two active users during its lifetime I was not worried about sending more then 10000 emails a month. From the Mailgun dashboard you can see the number of emails sent, and how many were delivered successfully and how many failed delivery.

Mailgun dashboard Mailgun dashboard

Mailgun setup requires one to register a Mailgun account and create a Mailgun “domain” (this is not an actual internet domain, more like an application which is associated with one of your own real internet domain names). To allow Mailgun to send and receive email associated with your actual domain you will need to add a few TXT entries to your DNS configuration. For each registered Mailgun domain you will be given an API key which can be used to send and receive email via HTTP requests to the Mailgun API. This is all explained really well in the Mailgun documentation.

To use the Mailgun API from within my Celery task I pass in the domain, “from” email address and the API key via command line arguments to the celery worker process:

$ python -m celery -A carbalert.carbalert_scrapy.carbalert_scrapy.tasks worker
     --loglevel=info -f celery_worker.log --max-tasks-per-child 1
     --email "${MAILGUN_EMAIL}" --key ${MAILGUN_API_KEY} --domain ${MAILGUN_DOMAIN}

The command line argument key words are are registered within the tasks.py file. The values for the arguments are bound to properties in a MailgunAPITask class. This is the “base’ task for my send_email_notification task specified above, and so the properties are directly accessible from within the task function.

See below the Celery configuration which binds the arguments to the properties:

class MailgunAPITask(Task):
    abstract = True
    mailgun_api_key = None
    mailgun_email = None
    mailgun_domain = None


class MailgunArgs(bootsteps.Step):
    def __init__(
        self, worker, mailgun_domain, mailgun_email, mailgun_api_key, **options
    ):
        MailgunAPITask.mailgun_domain = mailgun_domain[0]
        MailgunAPITask.mailgun_email = mailgun_email[0]
        MailgunAPITask.mailgun_api_key = mailgun_api_key[0]


logger = get_task_logger(__name__)
app = Celery("tasks")
app.conf.broker_url = "redis://redis:6379/0"
app.user_options["worker"].add(
    Option("--domain", dest="mailgun_domain", default=None, help="Mailgun domain")
)

app.user_options["worker"].add(
    Option(
        "--email",
        dest="mailgun_email",
        default=None,
        help='Mailgun "from" email address.',
    )
)

app.user_options["worker"].add(
    Option("--key", dest="mailgun_api_key", default=None, help="Mailgun API key")
)

app.steps["worker"].add(MailgunArgs)

Link to code: https://github.com/dinofizz/carbalert/blob/master/carbalert/carbalert_scrapy/carbalert_scrapy/tasks.py

Celery apps in Docker containers

See below the relevant sections from my docker-compose.yml file which describes the command line parameters and environment variables used to start the Celery processes. The “worker” and the “beat” Celery process each run in an independant container:

...
  celery_worker:
    build: .
    working_dir: /code
    command: celery -A carbalert.carbalert_scrapy.carbalert_scrapy.tasks worker --loglevel=info -f celery_worker.log --max-tasks-per-child 1 --email "${MAILGUN_EMAIL}" --key ${MAILGUN_API_KEY} --domain ${MAILGUN_DOMAIN}
    volumes:
      - .:/code
    depends_on:
      - web
      - redis
    environment:
      - SCRAPY_SETTINGS_MODULE=carbalert.carbalert_scrapy.carbalert_scrapy.settings

  celery_beat:
    build: .
    working_dir: /code
    command: celery -A carbalert.carbalert_scrapy.carbalert_scrapy.tasks beat --loglevel=info -f celery_beat.log
    volumes:
      - .:/code
    depends_on:
      - celery_worker
...

The complete listing for the tasks.py class can be found here:

Flower

I’m using Flower as a front-end for monitoring the CarbAlert Celery tasks. This is useful as I can see time-series data for the Carbonite scraping and email sending task, the data that is being passed to the tasks as well as the status of the completed task. There is very little configuration, as it will inspect your existing Celery configuration to determine the tasks to inspect and which broker is being used.

The Flower portal is available on my host at http://carbalert.dinofizzotti.com/flower. Flower provides a few different authentication mechanisms to control access to the Celery metrics. I am using GitHub OAuth, with the required OAuth parameters being passed into the process via environment variables.

My docker-compose.yml file contains the command line entry and arguments for running my Flower instance (in its own container):

...
  celery_flower:
    build: .
    working_dir: /code
    command: celery -A carbalert.carbalert_scrapy.carbalert_scrapy.tasks flower --loglevel=debug --auth_provider=flower.views.auth.GithubLoginHandler --auth=${FLOWER_OAUTH2_EMAIL} --oauth2_key=${FLOWER_OAUTH2_KEY} --oauth2_secret=${FLOWER_OAUTH2_SECRET} --oauth2_redirect_uri=${FLOWER_OAUTH2_REDIRECT_URI} --url_prefix=flower
    ports:
      - "5555:5555"
    depends_on:
      - celery_worker
...

See below some screenshots from Flower:

Flower GitHub OAuth Flower GitHub OAuth
Flower Flower
Flower: Celery tasks Flower: Celery tasks
Flower: Celery task detail Flower: Celery task detail

Flower: Monitoring Flower: Monitoring

Next post in series: Part 4: Deploying and using CarbAlert