Map-reduce style computation

In this lab, we will build a distributed word count application. But for simplicity, we will pretend that our multi-core system is a distributed system in itself.

We will start stateless python processes to act as distributed workers. These workers will co-ordinate with each other through Redis. Redis behaves like the master (or driver program) in contemporary systems. For further simplicity, we do not create control plane/data plane separation which is crucial for performance! Workers take tasks from Redis and push word counts back to Redis.

All workers have access to a shared file system for reading and writing large inputs. This is again trivially true on our single system. In a real setup, workers would additionally use a distributed file system like HDFS or a blob store like S3.

Lab instructions

You will use Python, Pandas and Redis for this Lab.

Download the starter code

Create a working directory and navigate to it:

   mkdir -p ws24/lab1 && cd ws24/lab1

Unzip the starter code into the working directory:

   unzip ../starter_code.zip

Note: Adjust the path to starter_code.zip based on where you downloaded the file.

Create a conda environment with python 3.10 and install the required packages.

conda create -n lab1 python=3.10 && conda activate lab1 

Install the required packages inside the conda environment. The requirements.txt file is in the starter code.

pip install -r requirements.txt

Redis Installation:

$ docker run -d -p 6379:6379 --name redis --rm redis:7.4
$ redis-cli --version

#check if you got the right redis-server version
$ redis-cli -h 127.0.0.1 -p 6379 INFO server

$ redis-cli ping

Running the code

Once you are done with the implementation/TODOs, run the client.py file to start the word count application.

python client.py

Download RedisInsight to visualize the Redis streams and sorted sets for better understanding.

Dataset Description

The dataset is available at link. Each CSV file contains 7 attributes, following are a brief description of each attribute:

Problem Statement

Count the occurrence of each word given a set of files. Your task is to create an application that can handle the large amount of data, which is estimated to be in the range of GBs.

Overview

Part 0

Hope you already have Redis running. Familiarize yourself with Redis. Learn sending commands to redis using redis-cli and from python programs using the redis-py library. Especially familiarize yourself with sorted sets. You will use them to maintain word counts. You should also read about redis streams. You need the following redis stream commands for the first part: xadd, xreadgroup, xcreate_group and xack. Understand what they do. Finally, you will need to write a Redis function for making your tasks idempotent.

Part 1: Parallel execution

We will first make the word count application run end-to-end using Redis. But before that, download the starter code.

Update DATA_PATH in config.json to point to your data folder. Run python3 client.py. In this lab, you have to modify worker.py and myrds.py.

The basic structure is as follows:

Part 2: Fault tolerance of workers

Now we wish to ensure that our system is tolerant to worker failures. Since, workers are stateless, we should be ok with losing worker state. But, we still have to ensure two things:

Part 3: Redis FT using checkpoints

We would like to now ensure that our system tolerates Redis failures. We need not change the worker code for this part. To reason about correctness, note that a Redis instance handles one command after another in a single thread.

In this part, we will periodically create a checkpoint using the BGSAVE command. Redis starts periodically storing a dump.rdb file on disk.

You can run CONFIG GET dir from redis-cli to find the directory where dump.rdb gets stored. You may try to crash the Redis instance and then start a new Redis instance. Redis should automatically read dump.rdb file and restore from it. Verify that this new instance have the keys from the old instance by running keys * using redis-cli.

Now while the job is running, try crashing the Redis instance and restarting another one. From a correctness standpoint, checkpoints are consistent because Redis has a single event loop and because all our edits were made atomic in the previous part.

In other words, let us say that a file foo was processed after the checkpoint. Now after a failover, the new Redis instance (recovered from the checkpoint) will remember that the file has NOT yet been xacked. Therefore, a worker will again receive the file for processing and it will again xack + increment word counts in one atomic operation. Since our workers are stateless and file counts are deterministic, recomputing a file’s word counts are ok.

You can load the lua function mylib.lua into the Redis instance using the following command:

cat mylib.lua | redis-cli -x FUNCTION LOAD REPLACE

Ensure that you set up the new instance in an identical manner, i.e, listen on the same port, set up the same password, and insert the same lua functions.