A good Data Engineer must not only be familiar with GUI-based tools, but also be able to develop efficient code for common data engineering activities such as the development and maintenance of ETL/ELT pipelines, data transformations, feature engineering to support model training, insights generation etc.
Who Is A Junior Data Engineer
Whilst the specific requirements (and the definition) of a Junior Data Engineer may vary, depending on the organisation (and in some cases, geography as well), the bare minimum characteristics are:
- Less than 3 years of relevant work experience as a Data Engineer.
- Build and maintain scalable end-to-end data pipelines and ETL/ELT processes.
- Good programming skills in one or more programming languages, such as Python, Scala, Rust, and Kotlin.
- Development and maintenance of scripts and associated code for automating activities in the data pipelines.
- Implement methods to improve data reliability and quality.
- Awareness of the 18 DataOps principles, especially
- Analytics as Code (brief explanation, below),
- Make it reproducible,
Software Engineer Vs Data Engineer
While both involve programming, a Software Engineer focuses on application development whereas a Data Engineer focuses on data management. A Software Engineer builds software products, while a Data Engineer builds data products/platforms.
“… Data Engineers are Software Engineers specialising in building data intensive applications …”
[Job Posted: 17 Oct 2023, Location: Singapore]
In some organisations, the distinction between a Data Engineer and a Software Engineer is often blurred.
Minimum Python Skills Expected
As of November 2023, the minimum Python skills expected from Junior Data Engineers are:
- Generators
- Iterators
- Object-Oriented Programming
- Regular Expressions
- Threading and Multiprocessing
- Unit Testing
- Profiling
(each of the links points to simple explanations in relevant sub-sections below)
Explanations
What Are Generators?
Generators are a type of Python function that behave like an iterator. Since they do not store all the values (to be iterated over) in memory at once, they help iterate over large datasets and seemingly infinite data sequences while allowing complex operations.
Why should a Data Engineer know about generators and use them?
Data engineers should learn about generators because they can help optimise their code’s performance.
- Generators enable lazy evaluation - the expressions inside are only evaluated when you request the next value from it.
- This allows the processing of large datasets without loading everything into memory at once, which would otherwise have eventually caused performance issues.
- Processing data on the fly helps reduce memory usage and improve performance.
An Example
def process_data_stream():
# Example helper function used to read events from a RESTful API
while True:
event_data = None
# Make the API call to fetch the event
# NOTE: The details & exception handling omitted for sake of brevity
event_data = retrieve_data_from_api() # Details omitted
# .. perform complex processing of raw event data
yield event_data
# Calling the generator
event_stream = process_data_stream()
for event in event_stream:
# .. do something with the event
What Are Iterators?
Iterators are objects that can be used to step through a sequence of elements. They are used by Python’s for loops and other functions that operate on sequences of elements.
What Are Regular Expressions?
Regular expressions (RegEx) are a way to search for and manipulate patterns in text.
Why should a Data Engineer know about RegEx and use them?
A few (non-exhaustive) use cases where a Data Engineer might need to use RegEx.
- Data cleaning and preparation: RegEx can be used to remove unwanted characters, format text, and extract specific information from text.
- Data validation: RegEx can be used to validate data, such as ensuring that email addresses are in the correct format or that phone numbers have the correct number of digits, etc.
- Data transformation: RegEx can be used to transform data, such as converting text to uppercase or lowercase, removing spaces, replacing certain characters with others, etc.
- Working with log files: RegEx are often used to parse and analyze large log files, which can be especially productive when troubleshooting and debugging applications.
- Automating system administration tasks.
An Example
Given a log file with contents similar to
[2023-10-04 14:23:12] [DEBUG] Starting application
[2023-10-04 14:23:15] [DEBUG] Connecting to database
[2023-10-04 14:23:18] [DEBUG] Database connection established
[2023-10-04 14:23:23] [DEBUG] Processing record 1000
[2023-10-04 14:23:26] [DEBUG] Processing record 2000
[2023-10-04 14:23:28] [WARN] Unexpected values encountered in record ID 2543. Expected 45, got 54
[2023-10-04 14:23:30] [INFO] Processing completed. Processed 2875 records
[2023-10-04 14:23:31] [DEBUG] Application shutting down
The objective is to programmatically be alerted of instances where unexpected values are encountered.
import re
from typing import List
def batch_parse_log_file(
log_file_path: str,
batch_size: int=1000):
# Example helper function to parse a log file in indicated sized batches
# (default batch size is 1000 lines)
with open(log_file_path, 'r') as log_file:
current_batch = []
for line in log_file:
current_batch.append(line)
# Check if the current batch is full
if len(current_batch) == batch_size:
# Yield the current batch and start a new batch
yield current_batch
current_batch = []
# Yield any remaining lines in the batch
if len(current_batch) > 0:
yield current_batch
def find_unexpected_errors(lines: List[str]):
for line in lines:
# Use a Regex to find lines similar to pattern
# Use a regular expression to find lines similar to
# "[2023-10-04 14:23:28] [ERROR] Unexpected error encountered. Expected 45, got 54"
match = re.search(r'^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] Unexpected values encountered in record ID (\d+). Expected (\d+), got (\d+)', line)
if match:
timestamp = match.group(1)
record_id = int(match.group(2))
expected = int(match.group(3))
actual = int(match.group(4))
yield timestamp, record_id, expected, actual
if __name__ == "__main__":
log_file_path = "log.txt"
for batch in batch_parse_log_file(log_file_path):
for timestamp, record_id, expected, actual in find_unexpected_errors(batch):
print(f"Timestamp: {timestamp}")
print(f"Record ID: {record_id}")
print(f"Expected value: {expected}")
print(f"Actual value: {actual}")
What Is Analytics as Code?
Analytics as Code (AaC) is a methodology that applies software development practices to analytics. It involves defining and managing analytics workflows and solutions using code rather than relying on point-and-click interfaces or manual configurations.
Benefits of AaC include:
- Define analytics logic and processes in code: Things like ETL/ELT pipelines, data transformations, model training/tuning, insights generation are defined programmatically using code like Python or SQL rather than relying only on GUI-based tools.
- Version Control and Collaboration: AaC enables the use of version control systems like Git to track changes, collaborate effectively, and ensure reproducibility. This makes it easier to maintain analytics solutions, revert to previous versions, and share work with others.
- Increased Agility and Scalability: By treating analytics as code, organizations can leverage DevOps practices to automate the deployment and maintenance of analytics solutions. This allows for faster development cycles, easier scaling to handle growing data volumes, and continuous integration with other software systems.
- Improved Reusability and Reproducibility: AaC promotes the creation of reusable analytics components and standardized workflows. This reduces duplication of effort, improves consistency across analyses, and makes it easier to adapt analytics solutions to new requirements.
- Reduced Errors and Risks: AaC encourages the use of automated testing and validation techniques, which helps identify and fix errors early in the development process. This reduces the risk of deploying flawed analytics that could lead to inaccurate insights or business decisions.
What Is Object-Oriented Programming?
Object-oriented programming (OOP) is a fundamental programming paradigm that allows for the creation of reusable and maintainable code. Data engineers should have a strong understanding of OOP concepts such as classes, objects, inheritance, and polymorphism.
Inheritance: OOP supports code reuse through inheritance of classes. This helps write less code to create variations of objects.
Polymorphism: Objects of different classes can be handled through common interfaces. This improves flexibility and ability to extend programs.
What Is Profiling Python Code?
- Profiling is the process of measuring the performance of some code. It can be used to identify bottlenecks and optimize the code for performance.
- Python includes a profiler called cProfile, which only gives the total running time, the times for each function, and the number of times each function was called, making it easy to determine where you should make optimisations. Refer to this site for more information and examples.
Why should a Data Engineer know about and profile Python code?
Profiling code enables the following opportunities:
- Identifying performance bottlenecks: profiling highlights inefficient code which cause slowdowns, leading to performance bottlenecks in data engineering tasks
- Resource utilization and optimization: profiling informs decisions on code restructuring and resource allocation
- Identifying scalability issues: optimized code improves the handling of large datasets
- Time Complexity: profiling helps measure and improve code execution time
What Is Threading and Multiprocessing?
Threading and multiprocessing allow you to run multiple tasks simultaneously. This can be useful for speeding up computation-intensive tasks or for running tasks in parallel.
- Threading uses multiple threads within a single process, while multiprocessing utilizes separate processes that operate independently.
- Threads share the same memory space and run concurrently in a single process, whereas processes have independent memory space and operate in isolation on separate CPUs/cores. [#1], [#3]
- Threading is generally used for I/O-bound tasks (such as making API calls, or retrieving data from the database) that can benefit from running simultaneously without interference. Multiprocessing is preferred for CPU-intensive tasks to utilize multiple physical cores. [#2]
- Threads raise concurrency issues like race conditions due to shared state. Processes remain unaffected by other processes running concurrently. [#4]
When might a Data Engineer choose between multi-threading vs multi-processing?
A few scenarios where a Data Engineer might use multi-threading:
- Loading multiple data files simultaneously
- Running concurrent database queries or API calls to external services
- Crawling/scraping multiple websites simultaneously
A few scenarios where a Data Engineer might use multi-processing:
- Processing large datasets that don’t fit in memory by splitting work across processes
- Running computationally intensive ETL tasks like feature engineering in parallel
- Processing data streams from multiple devices simultaneously
- Scaling out simulations, Monte Carlo analyses or other CPU-bound work
So in summary, the Data Engineer would reach for multithreading for I/O tasks that can run concurrently like loading data, making requests. Multiprocessing would be used when true parallelism across CPU cores is needed for processing large datasets or running computationally intensive tasks.
What Is Unit Testing?
- Unit testing is a software development process in which individual units of code are tested to determine if they are working as expected. It is a critical part of developing high-quality software.
- It is an important step in the Analytics as Code methodology.
An Example
An example of Python function used to clean and transform customer data and the corresponding unit test for it.
import re
import pandas as pd
def check_if_numeric(s: str):
# Helper function used to heck if the string is a valid integer or float
if re.match(r"^-?\d+$", s) or re.match(r"^-?\d+\.\d+$", s):
return True
# Check if the string is a comma-separated numeric value
if re.match(r"^-?\d+(,\d+)*$", s):
try:
float(s.replace(",", ""))
return True
except ValueError:
pass
return False
def clean_and_transform_customer_data(data: pd.DataFrame):
# Remove leading and trailing spaces from customer names
data["name"] = data["name"].str.strip()
# Convert customer IDs to integers
data["customer_id"] = data["customer_id"].astype(int)
# Convert purchase amounts to floats
data["purchase_amount"] = data["purchase_amount"]\
.str.replace("$", "", regex=True)
data["purchase_amount"] = data["purchase_amount"]\
.apply(lambda x: float(x) if check_if_numeric(x) else None)
# Fill in missing values for email addresses with "unknown@example.com"
data["email"] = data["email"].fillna("unknown@example.com")
# Convert genders to lowercase
data["gender"] = data["gender"].str.lower()
# Drop rows with invalid customer IDs
data = data.dropna(subset=["customer_id"])
# Drop rows with invalid purchased amount
data = data.dropna(subset=["purchase_amount"])
return data
# Other functions omitted for sake of brevity
The corresponding unit test code,
import pandas as pd
import pytest
def test_clean_and_transform_customer_data():
# Create sample customer data with invalid entries
data = pd.DataFrame({
"name": [" John Doe ", "Jane Doe", None],
"customer_id": ["A123", "B456", "C789"],
"purchase_amount": ["$123.45", "$567.89", "InvalidAmount"],
"email": ["johndoe@example.com", None, "janedoe@example.org"],
"gender": ["Male", "Female", "X"],
})
# Clean and transform the customer data
cleaned_data = clean_and_transform_customer_data(data.copy())
# Assertions to verify data cleaning and transformation
assert cleaned_data["name"][0] == "John Doe"
assert cleaned_data["customer_id"][0] == 123
assert cleaned_data["purchase_amount"][0] == 123.45
assert cleaned_data["email"][1] == "unknown@example.com"
assert cleaned_data["gender"][1] == "female"
assert pd.isna(cleaned_data.loc[2, "purchase_amount"])
# Next, ensure that the row with the invalid customer ID is dropped
assert len(cleaned_data) == 2
if __name__ == "__main__":
pytest.main()
References
- Multithreading vs. Multiprocessing: What’s the Difference? (Feb, 2023)
- Multithreading and Multiprocessing in 10 Minutes (Mar, 2022)
- Can a single process run in multiple cores? (Aug, 2016)
- What is a race condition?
Credits
- All images used in this post were generated by the author using Image Creator powered by DALL·E 3
Next Steps
Join us on one of the upcoming guided learning session on