Dino Fizzotti

Engineer, maker, hacker, thinker, funner.

Jan 4, 2022 - 7 minute read - Comments - Kubernetes Go Software

SiteMapper Part 2: Distributed crawling using Kubernetes, NATS and Cassandra

Image showing logos of technologies used in this project

In Part 1 of this project series I created a stand-alone CLI tool written Go to build a sitemap of internal links for a given URL to a specified maximum depth. In Part 2 I describe I how achieved the same result of creating a sitemap, but by distributing the crawl activity using the Kubernetes API to schedule independent ephemeral crawl jobs for each link. I’m using NATS for pod-to-pod messaging and AstraDB (a managed Cassandra DB) for persistence. All of the application code is written in Go.

Overview

Note: The point of doing this was not to provide an optimal solution, but to experiment and play with Kubernetes, Go, NATS, Cassandra.

Link to GitHub repo: https://github.com/dinofizz/sitemapper

While the stand-alone CLI implementation uses Go concurrency primitives to run multiple concurrent crawl tasks for a given root URL, the Kubernetes implementation uses a long-lived “crawl manager” pod to create ephemeral Kubernetes Job pods to crawl a single URL. A Kubernetes job runs a pod until completion.

Diagram depicting Kubernetes SiteMapper activities

The crawl manager subscribes to three individual NATS subjects at start-up. Different activities are triggered by the messages received on each of the subjects:

  • a “start” message begins the crawl activity for a root URL
  • a “crawl” message instructs the crawl manager to create a Job pod for a specific URL
  • a “results” message saves the results of a crawl and creates new crawl messages for each result

State for each sitemap, such as the root URL and the maximum crawl depth is saved to a managed Cassandra database using AstraDB.

The job pods call into the same SiteMapper Go code as per the stand-alone implementation, but only crawl a single URL (depth = 1). In place of writing the results to stdout, a message containing the URL and URL links results are published to the NATS results subject.

See below flow charts detailing how each type of message is handled by the crawl manager.

An “api” deployment exists which exposes a simple REST API for sitemap creation and result retrieval.

  • POST /sitemap with JSON body
    • This creates a “start” NATS message
  • GET /sitemap/<sitemap-id>

Sample requests and responses can be found below.

Notes on Kubernetes Deployment

I’m using Skaffold and Helm for deployment to two different Kubernetes clusters:

  • localhost cluster using k3d
  • Raspberry Pi cluster using k3s
    • See my previous blog post on how I originally set this up here

My skaffold.yaml contains a profile for each of the clusters above. For the Pi cluster I use Docker’s buildx tool to build ARM images. I push the images to a local Docker registry instance I keep running in my home server.

NATS

Having previously worked with Apache Kafka, and some limited experience with RabbitMQ, I decided on using a messaging technology that I was unfamiliar with: NATS.

NATS offers a few different messaging models from simple pub/sub to something a bit more like Kafka, featuring distributed persistence (“NATS JetStream”).

For the SiteMapper project I’m just using the simple pub/sub features of “core” NATS.

NATS is deployed to the Kubernetes cluster using a Helm chart:

$ helm repo add nats https://nats-io.github.io/k8s/helm/charts/
$ helm install nats nats/nats

Link to SiteMapper NATS code here.

start message handling

Example start message:

{
  "SitemapID": "1fb9370a-68b0-11ec-b17f-2fd91b6befcc",
  "URL": "https://google.com",
  "MaxDepth": 3
}

“start” message handling

crawl message handling

Example crawl message:

{
  "CrawlID": "49d75b94-68b0-11ec-a451-4f47ed125bea",
  "SitemapID": "1fb9370a-68b0-11ec-b17f-2fd91b6befcc",
  "URL": "https://google.com",
  "CurrentDepth": 1
}

“crawl” message handling

results message handling

Example results message:

{
  "CrawlID": "49d75b94-68b0-11ec-a451-4f47ed125bea",
  "Results": [
    {
      "URL": "https://google.com",
      "Links": [
        "https://accounts.google.com/ServiceLogin",
        "https://drive.google.com/",
        "https://mail.google.com/mail/",
        "https://news.google.com/",
        "https://play.google.com/",
        "https://www.google.com/advanced_search",
        "https://www.google.com/intl/en/about.html",
        "https://www.google.com/intl/en/ads/",
        "https://www.google.com/intl/en/policies/privacy/",
        "https://www.google.com/intl/en/policies/terms/",
        "https://www.google.com/preferences",
        "https://www.google.com/services/",
        "https://www.google.com/setprefdomain"
      ]
    }
  ]
}

“results” message handling

AstraDB

I’ve been enjoying exploring NoSQL databases. I’m currently playing around with Apache Cassandra.

For maintaining sitemap results and crawl job state data I decided to make use of the free tier offered by DataStax’s AstraDB. AstraDB is a Cassandra DB-as-a-service.

These are my Cassandra table descriptions:

CREATE TABLE sitemapper.sitemaps (                                             
    sitemap_id timeuuid PRIMARY KEY,
    max_depth int,                  
    url text      
)

CREATE TABLE sitemapper.crawl_jobs (
    crawl_id timeuuid,
    sitemap_id timeuuid,
    depth int,
    max_depth int,
    status text,
    url text,
    PRIMARY KEY (crawl_id, sitemap_id)
) WITH CLUSTERING ORDER BY (sitemap_id DESC)

CREATE TABLE sitemapper.results_by_sitemap_id (
    sitemap_id timeuuid,
    url text,
    crawl_id timeuuid,
    links set<text>,
    PRIMARY KEY (sitemap_id, url)
) WITH CLUSTERING ORDER BY (url ASC)

To connect to my AstraDB instance I’m using gocql with a helper package easy-cass-go. Authenticating and connecting with AstraDB using just gocql is a bit cumbersome, and involves extracting values from text files from a ZIP file provided by AstraDB. Easy-cass-go makes things much easier, by accepting a client ID, client secret and a path to the ZIP file - and returns a gocql session instance.

Link to Go code for AstraDB here.

The AstraDB client ID, client secret and path to ZIP file are read from environment variables sourced from a Kubernetes secret:

kubectl create secret generic astra-auth \ 
  --from-literal=clientID='<client id>' \
  --from-literal=clientSecret='<client secret>' \
--from-literal=zipPath='/astra/secure-connect-sitemapper.zip'

Kubernetes Jobs

To create the job pods from within the crawl manager I am using the Kubernetes go client library. Here’s a snippet of the code which creates the jobs:

func (jm *JobManager) CreateJob(crawlID uuid.UUID, url string) error {
	cid := crawlID.String()
	jobs := jm.clientset.BatchV1().Jobs(jm.namespace)
	var backOffLimit int32 = 0
	cmd := fmt.Sprintf("/sitemapper -s %s --id %s", url, crawlID)

	jobSpec := &batchv1.Job{
		ObjectMeta: metav1.ObjectMeta{
			Name:      fmt.Sprintf("crawl-job-%s", crawlID),
			Namespace: jm.namespace,
			Labels: map[string]string{
				"crawl-id": cid,
			},
		},
		Spec: batchv1.JobSpec{
			TTLSecondsAfterFinished: &jm.ttl,
			Template: v1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Name:      fmt.Sprintf("crawl-pod-%s", crawlID),
					Namespace: jm.namespace,
					Labels: map[string]string{
						"crawl-id": cid,
					},
				},
				Spec: v1.PodSpec{
					Containers: []v1.Container{
						{
							Name:            "sitemapper",
							Image:           jm.jobImage,
							ImagePullPolicy: v1.PullIfNotPresent,
							Command:         strings.Split(cmd, " "),
							EnvFrom: []v1.EnvFromSource{{
								ConfigMapRef: &v1.ConfigMapEnvSource{
									LocalObjectReference: v1.LocalObjectReference{
										Name: "sitemapper",
									},
								},
							}},
						},
					},
					RestartPolicy: v1.RestartPolicyNever,
					NodeSelector: map[string]string{
						"k3s-role": "agent",
					},
				},
			},
			BackoffLimit: &backOffLimit,
		},
	}

	j, err := jobs.Create(context.TODO(), jobSpec, metav1.CreateOptions{})
	if err != nil {
		log.Printf("Failed to create job: %s\n", err)
		return err
	}

	log.Printf("Created job %s successfully", j.Name)
	return nil
}

Link to SiteMapper Kubernetes Go code here.

Notes:

  • The job pod runs a container image containing the SiteMapper job binary and passes in the crawl ID and URL as CLI arguments.
    • Note that the job binary is slightly different to the stand-alone binary, but they call into the same internal crawl engine code.
  • Configuration values for the NATS results message sending is read in from environment variables sourced from a ConfigMap
  • Specifying a “TTLSecondsAfterFinished” means that the jobs and related pods will automatically be removed from the namespace upon completion after the specified time.
  • I’ve set the pod restartPolicy = Never and the job backOffLimit = 0. This means that if a crawl job fails, it’s not rerun or recreated (I currently don’t have any crawl job error handling).
  • In testing I had an issue where I accidentally scheduled 1000s of jobs simultaneously and my k3s cluster became unresponsive, so I’ve also specified a ResourceQuota which limits the number of concurrent active jobs (link).

API

Link to Go code for the API here.

Usage example

First retrieve the NODE_IP and NODE_PORT the sitemap:

export NODE_PORT=$(kubectl get --namespace sitemapper -o jsonpath="{.spec.ports[0].nodePort}" services sitemapper)
export NODE_IP=$(kubectl get nodes --namespace sitemapper -o jsonpath="{.items[0].status.addresses[0].address}")

Send a POST to /sitemap with the URL and MaxDepth parameters:

$ curl -s -X POST http://$NODE_IP:$NODE_PORT/sitemap -d '{"URL":"https://www.google.com","MaxDepth":2}' | jq
{
  "URL": "https://www.google.com",
  "MaxDepth": 2,
  "SitemapID": "918e9d19-6c91-11ec-8f5b-9269ffb7ee39"
}

Once the crawl jobs have all completed you can retrieve the results using the sitemap ID returned in the response above:

$ curl -s http://$NODE_IP:$NODE_PORT/sitemap/918e9d19-6c91-11ec-8f5b-9269ffb7ee39 | jq
{
  "SitemapID": "918e9d19-6c91-11ec-8f5b-9269ffb7ee39",
  "MaxDepth": 2,
  "URL": "https://www.google.com",
  "Results": [
    {
      "URL": "https://www.google.com",
      "Links": [
        "https://www.google.com/advanced_search",
        "https://www.google.com/intl/en/about.html",
        "https://www.google.com/intl/en/ads/",
        "https://www.google.com/intl/en/policies/privacy/",
        "https://www.google.com/intl/en/policies/terms/",
        "https://www.google.com/preferences",
        "https://www.google.com/services/",
        "https://www.google.com/setprefdomain"
      ]
    },
    {
      "URL": "https://www.google.com/advanced_search",
      "Links": [
        "https://www.google.com/chrome/",
        "https://www.google.com/finance",
        "https://www.google.com/preferences",
        "https://www.google.com/travel/",
        "https://www.google.com/url",
        "https://www.google.com/webhp"
      ]
    },
<----- results truncated ------>

TODOs

  • Identify a mechanism for knowing when a sitemap is completed (i.e. all jobs completed).
  • Perhaps use a schema-based encoding for the messages, protobufs?
  • Handle failed crawl jobs, or at least report the failures somewhere visible to the end user.
  • Write tests. After writing a handful of tests for the stand-alone SiteMapper tool I got too excited with Kubernetes things.