<span style="font-size:24px;">Installing PyFlink<span>

<span style="font-size:16px;">Pre-requisites:<span>

1. <span style="font-size:16px;">Python 3.8+<span>
2. <span style="font-size:16px;">Java (JDK 8 or 11)<span>
3. <span style="font-size:16px;">Pip: Python's package manager, version 19.3 or later.<span>

<span style="font-size:16px;">To get started with PyFlink:<span>

1. <span style="font-size:16px;">Setup a virtual environment:<span>
    - <span style="font-size:16px;">Using command prompt: python3.xx -m venv pyflink-venv<span>
    - <span style="font-size:16px;">Activate the virtual environment:<span>
        - <span style="font-size:16px;">Windows: pyflink-venv\Scripts\activate.bat<span>
        - <span style="font-size:16px;">Linux: source pyflink-venv/bin/activate<span>
        - <span style="font-size:16px;">MacOS: source pyflink-venv/bin/activate<span>
    - <span style="font-size:16px;">OR using the notebook select the kernel as the virtual environment<span>
2. <span style="font-size:16px;">Install PyFlink via pip:<span>

In [None]:
%pip install apache-flink

<span style="font-size:16px;">We can verify that the installation by running the given codeblock.<span>

In [None]:
import pyflink
print(pyflink.__version__)

<span style="font-size:16px;">While PyFlink allows local execution for testing, you'll need the Flink runtime for advanced features.<span>

- <span style="font-size:16px;">Step 1: [Download Flink](https://flink.apache.org/downloads.html)<span>
- <span style="font-size:16px;">Select a stable release (e.g., Flink 1.17.x) compatible with your environment.<span>
- <span style="font-size:16px;">Step 2: Extract the Flink Archive: Extract the downloaded file using command prompt.<span>

    - <span style="font-size:16px;">tar -xzf apache-flink-*.tgz<span>
    - <span style="font-size:16px;"> Move the extracted directory to a suitable location (e.g., /opt/flink)<span>

        <span style="font-size:16px;">sudo mv apache-flink-*/ /opt/flink<span>
- <span style="font-size:16px;">Step 3: Add Flink to the PATH<span>
    - <span style="font-size:16px;">Update your shell configuration (~/.bashrc, ~/.zshrc, etc.)<span>

        <span style="font-size:16px;">export FLINK_HOME=/opt/flink<span>
        <span style="font-size:16px;">export PATH=$FLINK_HOME/bin:$PATH<span>
    - <span style="font-size:16px;">Source the updated configuration<span>

        <span style="font-size:16px;">source ~/.bashrc<span>
        
        <span style="font-size:16px;">source ~/.zshrc<span>
    - <span style="font-size:16px;">Verify the installation<span>
    
        <span style="font-size:16px;">flink --version<span>

<span style="font-size:16px;">We can now start a local Flink cluster using the following command:<span>

<span style="font-size:16px;"> $FLINK_HOME/bin/start-cluster.sh <span>

<span style="font-size:16px;"> This will also enable the Flink Web UI which can be accessed at [http://localhost:8081](http://localhost:8081)<span>

<span style="font-size:16px;">We can test our installation by running the following codeblock<span>


In [1]:
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.from_collection([1, 2, 3, 4]).print()
env.execute("Test PyFlink Job")

5> 1
8> 4
7> 3
6> 2


<pyflink.common.job_execution_result.JobExecutionResult at 0x11caab550>

<span style="font-size:16px;">Since pyflink doesn't integrate with the flink cluster, to run our code on the cluster and observe it on the UI, we first have to create a python file with our code and then run it using flink using the command:<span>

<span style="font-size:16px;">$FLINK_HOME/bin/flink run -py my_flink_job.py<span>

<span style="font-size:16px;">Once we are done with our work, we can stop the cluster using the command:<span>

<span style="font-size:16px;">$FLINK_HOME/bin/stop-cluster.sh<span>

<span style="font-size:24px;"> Basic Operations in PyFlink(DataStream API)<span>

<span style="font-size:16px;">1. Creating a Stream Execution Environment<span>

In [None]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types, Iterable
from pyflink.common.time import Time
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow
from pyflink.datastream.functions import WindowFunction, AllWindowFunction

env = StreamExecutionEnvironment.get_execution_environment()

<span style="font-size:16px;"> The next step is to create a datastream environment. We first create a source stream and then apply transformations on it.<span>

In [2]:
input_path = "ws-logs_filtered.csv"

ds = env.from_source(
    source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                               input_path)
                     .process_static_file_set().build(),
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
    source_name="file_source"
)

<span style="font-size:16px;"> We can now add transformations to our stream. Each data stream is terminated by a sink, which simply prints in this case.<span>

In [3]:
def split(line):
        yield from line.split(',')
        
ds = ds.flat_map(split) \
        .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
        .key_by(lambda i: i[0]) \
        .reduce(lambda i, j: (i[0], i[1] + j[1]))
        
ds.print()

<pyflink.datastream.data_stream.DataStreamSink at 0x153a40510>

<span style="font-size:16px;"> We can finally run the stream using the execute() method.<span>

In [None]:
env.execute("Test PyFlink Job")

1> (Event context,1)
1> (17/11/24; 00:01:24,1)
1> (User report,1)
1> (mod_vpl: vpl description viewed,1)
1> (mod_vpl: submission edited,1)
1> (mod_vpl: vpl description viewed,2)
1> (User report,2)
1> (Tour ended,1)
1> (Tour started,1)
1> (The user with id '1918' has viewed their dashboard,1)
1> (User report,3)
1> (17/11/24; 00:02:57,1)
1> (mod_vpl: vpl description viewed,3)
1> (mod_vpl: vpl description viewed,4)
1> (User: Lisa Jenkins,1)
1> (The user with id '2055' viewed the 'quiz' activity with course module id '7'.,1)
1> (17/11/24; 00:04:10,1)
1> (Tour started,2)
1> (17/11/24; 00:04:15,1)
1> (Tour ended,2)
1> (Tour started,3)
1> (Tour ended,3)
1> (The user with id '1917' has viewed the discussion with id '2' in the forum with course module id '3'.,1)
1> (The user with id 1918 viewed  VPL submission with id 122,1)
1> (mod_vpl: vpl description viewed,5)
1> (mod_vpl: submission edited,2)
1> (mod_vpl: vpl description viewed,6)
1> (17/11/24; 00:05:22,1)
1> (mod_vpl: submission run,1)
1> 

<span style="font-size:16px;"> We can also create a stream from python collections using the from_collection() method.<span>

In [None]:
word_count_data =[
    "Hello World",
    "This is a test",
    "Hello Flink"
]

ds = env.from_collection(word_count_data)

<span style="font-size:16px;">Instead of simply printing the output, we can also write it to a file as a sink.<span>

In [None]:
ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_output_file_config(
                OutputFileConfig.builder()
                .with_part_prefix("prefix")
                .with_part_suffix(".ext")
                .build())
            .with_rolling_policy(RollingPolicy.default_rolling_policy())
            .build()
        )

<span style="font-size:24px;">What is Watermarking?<span>

<span style="font-size:16px;">Watermarking in Apache Flink is a mechanism to handle event-time processing by dealing with out-of-order events. Events in distributed systems often arrive with delays due to network latencies, retries, or other factors. Flink uses watermarks to determine when it is safe to process events within a specific time window while still accommodating late arrivals.<span>

<span style="font-size:16px;">Key Concepts<span>

- <span style="font-size:16px;">Event Time:The timestamp of when the event occurred (e.g., in a log or message). Different from processing time, which is when the event is processed by Flink.<span>

- <span style="font-size:16px;">Watermark: A timestamp that represents the progress of event-time in a stream. Indicates that all events with timestamps less than or equal to the watermark have likely been observed. Allows late data handling by setting a delay (lateness threshold).<span>

- <span style="font-size:16px;">Late Data: Events that arrive after the watermark for their event-time window has passed. These events are either dropped or handled explicitly by the application.<span>

<span style="font-size:16px;">How Watermarks Work: Watermarks are generated periodically based on the event timestamps. Flink propagates watermarks downstream to let operators and windows know how far event-time has progressed. Late events (with timestamps less than the watermark) are either ignored or handled as specified.<span>

In [None]:
# from pyflink.common.time import Duration
# from pyflink.datastream import TimestampAssigner
# from typing import Tuple

# # Initialize the StreamExecutionEnvironment
# env = StreamExecutionEnvironment.get_execution_environment()

# # Sample data: (timestamp in milliseconds, value)
# data = [
#     (1000, "event1"),  # Event with timestamp 1000ms
#     (2000, "event2"),
#     (3000, "event3"),
#     (4000, "event4")
# ]

# # Define a custom TimestampAssigner
# class MyTimestampAssigner(TimestampAssigner):
#     def extract_timestamp(self, element: Tuple[int, str], record_timestamp: int) -> int:
#         # Extract timestamp from the tuple (the first element)
#         return element[0]

# # Define a WatermarkStrategy with bounded lateness of 5 seconds
# watermark_strategy = WatermarkStrategy \
#     .for_bounded_out_of_orderness(Duration.of_seconds(5)) \
#     .with_timestamp_assigner(MyTimestampAssigner())

# # Create a DataStream and assign timestamps and watermarks
# data_stream = env.from_collection(collection=data, type_info=Types.TUPLE([Types.LONG(), Types.STRING()])) \
#     .assign_timestamps_and_watermarks(watermark_strategy)

# # Process the stream (e.g., print the events with timestamps)
# data_stream.print()

# # Execute the Flink pipeline
# env.execute("Watermark Example")






# This wight not work with the current version of PyFlink

ImportError: cannot import name 'TimestampAssignerSupplier' from 'pyflink.datastream.functions' (/Users/anshiksahu/Desktop/.venv/lib/python3.11/site-packages/pyflink/datastream/functions.py)

<span style="font-size:24px;">Basic Transformations in PyFlink<span>

<span style="font-size:16px;">Map<span>

<span style="font-size:16px;">DataStream → DataStream <span>

<span style="font-size:16px;">Takes one element and produces one element. A map function that doubles the values of the input stream:<span>

In [None]:
data_stream = env.from_collection(collection=[1, 2, 3, 4, 5])
data_stream.map(lambda x: 2 * x, output_type=Types.INT())

<span style="font-size:16px;">FlatMap<span>

<span style="font-size:16px;">DataStream → DataStream<span>

<span style="font-size:16px;">Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:<span>

In [None]:
data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING())

<span style="font-size:16px;">Filter<span>

<span style="font-size:16px;">DataStream → DataStream <span>

<span style="font-size:16px;">Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:<span>

In [None]:
data_stream = env.from_collection(collection=[0, 1, 2, 3, 4, 5])
data_stream.filter(lambda x: x != 0)

<span style="font-size:16px;">KeyBy<span>

<span style="font-size:16px;">DataStream → KeyedStream<span>

<span style="font-size:16px;">Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.<span>

In [None]:
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'b')])
data_stream.key_by(lambda x: x[1], key_type=Types.STRING())

<span style="font-size:16px;">Reduce<span>

<span style="font-size:16px;">KeyedStream → DataStream<span>

<span style="font-size:16px;">A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.<span>

<span style="font-size:16px;">A reduce function that creates a stream of partial sums:<span>

In [None]:
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))

<span style="font-size:24px;"> What are windows?<span>

<span style="font-size:16px;">Windows in PyFlink are used to split a stream of data into finite chunks, allowing for efficient processing of stream-based data. Stream processing is inherently unbounded, meaning that the data never stops flowing. Windows provide a way to group data into manageable subsets (based on time or count) so that operations like aggregation, counting, or summing can be performed on finite groups of data. This is essential for tasks like calculating running averages, aggregating counts over time intervals, and more.<span>

<span style="font-size:16px;">There are several types of windows in Flink:<span>

- <span style="font-size:16px;">Time Windows: These windows group events that occur within a specific time range. Time windows can be defined based on:<span>
    - <span style="font-size:16px;">Event Time: The time when the event was generated, which is typically used when the data source includes timestamps.<span>
    - <span style="font-size:16px;">Processing Time: The time when the event is processed by the system.<span>
    - <span style="font-size:16px;">Ingestion Time: The time when the event is ingested into the system.<span>

- <span style="font-size:16px;">Count Windows:These windows group events based on the number of elements. Once the window reaches a predefined number of elements, it triggers the operation and emits the result.<span>

- <span style="font-size:16px;">Session Windows: These windows group events that are close in time to each other. A session window is defined by an idle gap of time. If there is a gap longer than the session timeout, the current window will be closed, and a new session window will be created.<span>

- <span style="font-size:16px;">Global Windows: A special type of window that represents the entire stream as a single window. Typically used with custom window functions.<span>

<span style="font-size:16px;">Types of Windows in PyFlink<span>

- <span style="font-size:16px;">Tumbling Windows: Fixed-size, non-overlapping windows. Every event belongs to exactly one window, and the windows do not overlap. For example, a 10-second tumbling window will group all events that arrive in the 10-second time slot, and once the window is closed, the next one starts.<span>

- <span style="font-size:16px;">Hopping Windows: Fixed-size windows that overlap. These windows allow for a "hop" of a certain size, meaning that each window is not fixed to start at a specific point but can move.
For example, a 10-second hopping window with a 5-second slide will generate windows for every 5 seconds.<span>

- <span style="font-size:16px;">Sliding Windows: Similar to hopping windows but often used to group events based on fixed durations or overlapping windows. A sliding window can have a smaller time range, but the window slides continuously, emitting results at each step.<span>

- <span style="font-size:16px;">Global Window: This window type encompasses all the elements in a stream.
Global windows are typically used for batch processing, but they can also be useful in stream processing for certain types of aggregations. <span>

<span style="font-size:16px;">Window<span>

<span style="font-size:16px;">KeyedStream → WindowedStream<span>

<span style="font-size:16px;">Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds).<span>

In [None]:
data_stream.key_by(lambda x: x[1]).window(TumblingEventTimeWindows.of(Time.seconds(5)))

<span style="font-size:16px;">WindowAll<span>

<span style="font-size:16px;">DataStream → AllWindowedStream<span>

<span style="font-size:16px;">Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds).<span> 

In [None]:
data_stream.window_all(TumblingEventTimeWindows.of(Time.seconds(5)))

<span style="font-size:16px;">Window Apply<span>

<span style="font-size:16px;">WindowedStream → DataStream<span>

<span style="font-size:16px;">AllWindowedStream → DataStream<span>

<span style="font-size:16px;">Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.<span>

In [None]:
class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]):

    def apply(self, key: int, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[1]
        yield sum


class MyAllWindowFunction(AllWindowFunction[tuple, int, TimeWindow]):

    def apply(self, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[1]
        yield sum


windowed_stream.apply(MyWindowFunction())

# applying an AllWindowFunction on non-keyed window stream
all_windowed_stream.apply(MyAllWindowFunction())

<span style="font-size:16px;">WindowReduce<span>

<span style="font-size:16px;">WindowedStream → DataStream<span>

<span style="font-size:16px;">Applies a functional reduce function to the window and returns the reduced value.<span>

In [None]:
class MyReduceFunction(ReduceFunction):

    def reduce(self, value1, value2):
        return value1[0], value1[1] + value2[1]


windowed_stream.reduce(MyReduceFunction())

<span style="font-size:16px;"> To see complete examples check out the following folder:<span>

<span style="font-size:16px;">$FLINK_HOME/examples/python/<span>

<span style="font-size:24px;">States in PyFlink<span>

<span style="font-size:16px;">In Flink, state is used to store information across different events in a stream. State is essential when you need to keep track of intermediate computations or remember information from previously processed events, such as counts, sums, or the last processed event.<span>

<span style="font-size:16px;">There are two main types of state in Flink:<span>

<span style="font-size:16px;">Keyed State: When you perform operations on a stream that is partitioned by a key, Flink allows each key to have its own state. For example, if you're calculating the sum of values per user (where each user is a key), each user will have a separate state to hold their running total.
Keyed state is typically used with operations like key_by() where events are grouped by keys.<span>

<span style="font-size:16px;">Operator State: This type of state is not partitioned by a key but is used to store information specific to an operator across its different parallel instances.
Operator state is useful in scenarios where the state is shared across all events processed by an operator, not keyed by any specific attribute.<span>

<span style="font-size:16px;">Flink provides two main categories of state:<span>

<span style="font-size:16px;">Value State: Stores a single value (e.g., the sum of a set of numbers).<span>
<span style="font-size:16px;">List State: Stores a collection of values (e.g., a list of all events for a specific key).<span>
<span style="font-size:16px;">Map State: Stores a map of key-value pairs (e.g., a mapping of user IDs to the most recent event).<span>

In [None]:
# Key based state example

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import MapFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.time import Time

# Initialize the StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# Sample data: (key, value)
data = [
    ('user1', 10),
    ('user2', 20),
    ('user1', 30),
    ('user2', 40),
    ('user1', 50)
]

# Define a MapFunction to maintain a running sum per key
class RunningSumFunction(MapFunction):
    def __init__(self):
        self.running_sum = None

    def open(self, runtime_context):
        # Create a value state descriptor to hold the sum for each key
        self.running_sum = runtime_context.get_state(ValueStateDescriptor("running_sum", Types.INT()))

    def map(self, value):
        # Get current sum from the state (return 0 if it's not initialized yet)
        current_sum = self.running_sum.value() or 0
        new_sum = current_sum + value[1]
        # Update the state with the new sum
        self.running_sum.update(new_sum)
        return (value[0], new_sum)

# Create a DataStream from the collection
data_stream = env.from_collection(collection=data, type_info=Types.TUPLE([Types.STRING(), Types.INT()]))

# Apply a keyed operation and use the RunningSumFunction
data_stream.key_by(lambda x: x[0]) \
    .map(RunningSumFunction()) \
    .print()

# Execute the Flink pipeline
env.execute("Running Sum Example")


8> ('user1', 10)
8> ('user1', 40)
8> ('user1', 90)
6> ('user2', 20)
6> ('user2', 60)


<pyflink.common.job_execution_result.JobExecutionResult at 0x16aa3b250>

In [None]:
# Operator state example

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types

# Initialize the StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# Sample data: (key, value)
data = [
    ('user1', 10),
    ('user2', 20),
    ('user1', 30),
    ('user2', 40),
    ('user1', 50)
]

# Define a KeyedProcessFunction to maintain the operator state (total event count)
class EventCountFunction(KeyedProcessFunction):
    def __init__(self):
        self.total_count = None

    def open(self, runtime_context):
        # Create a value state descriptor to hold the total event count
        self.total_count = runtime_context.get_state(ValueStateDescriptor("total_count", Types.LONG()))

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # Get the current event count from the state (initialize to 0 if not set)
        current_count = self.total_count.value() or 0
        new_count = current_count + 1
        # Update the state with the new event count
        self.total_count.update(new_count)
        # Return the original event along with the current event count
        return (value[0], value[1], new_count)

# Create a DataStream from the collection
data_stream = env.from_collection(collection=data, type_info=Types.TUPLE([Types.STRING(), Types.INT()]))

# Key the stream by the first field (user_id)
keyed_stream = data_stream.key_by(lambda x: x[0])

# Apply the operator state counting function
keyed_stream.process(EventCountFunction()).print()

# Execute the Flink pipeline
env.execute("Operator State Event Count Example")


6> user2
6> 20
6> 1
6> user2
6> 40
6> 2
8> user1
8> 10
8> 1
8> user1
8> 30
8> 2
8> user1
8> 50
8> 3


<pyflink.common.job_execution_result.JobExecutionResult at 0x16aa0bd90>

<span style="font-size:24px;"> Flink Web UI<span>

<span style="font-size:16px;"> By defualt, if a cluster is active, you can access it on [http://localhost:8081](http://localhost:8081)<span>

<span style="font-size:16px;"> If you are working on Google collab, install pyngrok and checkout [Using pyngrok](https://pyngrok.readthedocs.io/en/latest/integrations.html)<span>

<span style="font-size:16px;">pip install pyngrok<span>

<span style="font-size:16px;">The Flink Web UI provides a comprehensive view of the cluster and job execution details. It is divided into three main sections:<span>

1. <span style="font-size:16px;">Dashboard Overview

    - <span style="font-size:16px;">High-level statistics about the Flink cluster.<span>
    - <span style="font-size:16px;">Number of TaskManagers, slots, and currently running jobs.<span>
    - <span style="font-size:16px;">Overall cluster resource utilization (e.g., memory, CPU).<span>

2. <span style="font-size:16px;">Jobs View

    - <span style="font-size:16px;">A list of all jobs in different states:
        - <span style="font-size:16px;">Running: Active jobs being executed.<span>
        - <span style="font-size:16px;">Completed: Successfully finished jobs.<span>
        - <span style="font-size:16px;">Failed: Jobs that encountered errors during execution.<span>
        - <span style="font-size:16px;">Canceled: Jobs that were stopped manually or due to an issue.<span>
    - <span style="font-size:16px;">Job-specific details:<span>
        - <span style="font-size:16px;">Job ID: Unique identifier for the job.<span>
        - <span style="font-size:16px;">Execution Status: Current state (e.g., running, failed).<span>
        - <span style="font-size:16px;">Job Name: Descriptive label set during job execution.<span>

3. <span style="font-size:16px;"> Job Details

    - <span style="font-size:16px;">Graphical representation of the job’s execution plan (DAG).<span>
    - <span style="font-size:16px;">Details of each operator, such as parallelism and resource utilization.<span>
    - <span style="font-size:16px;">Subtask execution details:<span>
        - <span style="font-size:16px;">Metrics for each subtask (e.g., processed records, latency).<span>
        - <span style="font-size:16px;">Execution times and task throughput.<span>
        - <span style="font-size:16px;">Runtime statistics, including checkpointing details.<span>

<span style="font-size:24px;">Stream Analytics using pyflink<span>

<span style="font-size:16px;"> You are given csv file containing the website logs for the quiz you took. Using these, you have to identify all the cheaters.<span>

<span style="font-size:16px;"> The csv file has the following columns:<span>
1. <span style="font-size:16px;"> 'Time': The timestamp in the format "%d/%m/%y; %H:%M:%S".<span>
2. <span style="font-size:16px;"> 'UserId': The unique Id of the user.<span>
3. <span style="font-size:16px;"> 'Event context': The current screen/webpage visible to the user.<span>
4. <span style="font-size:16px;"> 'Component': Type of interface visible to the user.<span>
5. <span style="font-size:16px;"> 'Event name': The action performed by the user.<span>
6. <span style="font-size:16px;"> 'Description': A detailed description of the action.<span>

<span style="font-size:16px;"> ('Event context' column is a refinement of the 'Component' column.)<span>

<span style="font-size:16px;"> You can conclude someone is a cheater if they answer a question significantly quicker than the average time for that problem. For generalization, Someone is a cheater if first response time is less than 1/5 times the average for that problem.<span>

<span style="font-size:16px;"> You have to identify cheaters while the stream is running and keep a count of all the cheaters.<span>

<span style="font-size:16px;"> You can use PyFlink to solve this problem.<span>

<span style="font-size:16px;"> (Note: The timestamps may not be in order.)<span>