Python Automation: Complete Guide to Automating Tasks - Learn how to automate repetitive tasks with Python scripts, from file operations to web scraping and...
Tutorial

Python Automation: Complete Guide to Automating Tasks

Learn how to automate repetitive tasks with Python scripts, from file operations to web scraping and API interactions.

TechDevDex Team
12/8/2024
22 min
#Python#Automation#Scripting#Productivity#Web Scraping

Python Automation: Complete Guide to Automating Tasks

Python is one of the best languages for automation due to its simplicity and extensive library ecosystem. This comprehensive guide will teach you how to automate various tasks using Python.

Table of Contents

  1. Getting Started with Python Automation
  2. File and Directory Operations
  3. Web Scraping and Automation
  4. API Automation
  5. Email Automation
  6. Database Automation
  7. Scheduling and Task Automation
  8. Advanced Automation Techniques

Getting Started with Python Automation

Essential Libraries

python
# Core automation libraries
import os
import shutil
import glob
import subprocess
import time
import datetime
import json
import csv

# Web automation
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By

# Email automation
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# Database automation
import sqlite3
import pymongo
import psycopg2

# Task scheduling
import schedule
import croniter

Setting Up Your Environment

bash
# Create virtual environment
python -m venv automation_env
source automation_env/bin/activate  # Linux/Mac
# automation_env\Scripts\activate  # Windows

# Install essential packages
pip install requests beautifulsoup4 selenium
pip install schedule python-dotenv
pip install pandas openpyxl
pip install sqlite3 psycopg2-binary pymongo

File and Directory Operations

Basic File Operations

python
import os
import shutil
from pathlib import Path

class FileManager:
    def __init__(self, base_path):
        self.base_path = Path(base_path)
    
    def create_directory(self, dir_name):
        """Create a new directory"""
        new_dir = self.base_path / dir_name
        new_dir.mkdir(exist_ok=True)
        print(f"Created directory: {new_dir}")
        return new_dir
    
    def copy_files(self, source_pattern, destination):
        """Copy files matching a pattern"""
        source_files = list(self.base_path.glob(source_pattern))
        dest_path = self.base_path / destination
        dest_path.mkdir(exist_ok=True)
        
        for file in source_files:
            shutil.copy2(file, dest_path)
            print(f"Copied: {file.name}")
    
    def organize_files_by_extension(self):
        """Organize files by their extensions"""
        for file in self.base_path.iterdir():
            if file.is_file():
                ext = file.suffix[1:] if file.suffix else 'no_extension'
                ext_dir = self.base_path / ext
                ext_dir.mkdir(exist_ok=True)
                shutil.move(str(file), str(ext_dir / file.name))
                print(f"Moved {file.name} to {ext}/")

# Usage
fm = FileManager('/path/to/documents')
fm.create_directory('organized_files')
fm.copy_files('*.pdf', 'pdf_files')
fm.organize_files_by_extension()

Advanced File Operations

python
import hashlib
import zipfile
from datetime import datetime, timedelta

class AdvancedFileManager:
    def __init__(self, base_path):
        self.base_path = Path(base_path)
    
    def find_duplicate_files(self):
        """Find duplicate files by content hash"""
        file_hashes = {}
        duplicates = []
        
        for file_path in self.base_path.rglob('*'):
            if file_path.is_file():
                try:
                    file_hash = self._calculate_hash(file_path)
                    if file_hash in file_hashes:
                        duplicates.append((file_path, file_hashes[file_hash]))
                    else:
                        file_hashes[file_hash] = file_path
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")
        
        return duplicates
    
    def _calculate_hash(self, file_path, chunk_size=8192):
        """Calculate MD5 hash of a file"""
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(chunk_size), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    
    def cleanup_old_files(self, days=30):
        """Delete files older than specified days"""
        cutoff_date = datetime.now() - timedelta(days=days)
        deleted_count = 0
        
        for file_path in self.base_path.rglob('*'):
            if file_path.is_file():
                file_time = datetime.fromtimestamp(file_path.stat().st_mtime)
                if file_time < cutoff_date:
                    try:
                        file_path.unlink()
                        deleted_count += 1
                        print(f"Deleted: {file_path}")
                    except Exception as e:
                        print(f"Error deleting {file_path}: {e}")
        
        print(f"Deleted {deleted_count} old files")
    
    def create_backup(self, backup_name=None):
        """Create a zip backup of the directory"""
        if not backup_name:
            backup_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
        
        backup_path = self.base_path.parent / backup_name
        
        with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for file_path in self.base_path.rglob('*'):
                if file_path.is_file():
                    arcname = file_path.relative_to(self.base_path)
                    zipf.write(file_path, arcname)
        
        print(f"Backup created: {backup_path}")
        return backup_path

# Usage
afm = AdvancedFileManager('/path/to/documents')
duplicates = afm.find_duplicate_files()
afm.cleanup_old_files(days=90)
afm.create_backup()

Web Scraping and Automation

Basic Web Scraping

python
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from urllib.parse import urljoin, urlparse

class WebScraper:
    def __init__(self, base_url, delay=1):
        self.base_url = base_url
        self.delay = delay
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def scrape_links(self, url, selector='a'):
        """Scrape all links from a page"""
        try:
            response = self.session.get(url)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'html.parser')
            
            links = []
            for link in soup.select(selector):
                href = link.get('href')
                if href:
                    full_url = urljoin(url, href)
                    links.append({
                        'text': link.get_text(strip=True),
                        'url': full_url
                    })
            
            time.sleep(self.delay)
            return links
            
        except Exception as e:
            print(f"Error scraping {url}: {e}")
            return []
    
    def scrape_table_data(self, url, table_selector='table'):
        """Scrape data from HTML tables"""
        try:
            response = self.session.get(url)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'html.parser')
            
            tables = soup.select(table_selector)
            all_data = []
            
            for table in tables:
                rows = table.find_all('tr')
                headers = [th.get_text(strip=True) for th in rows[0].find_all(['th', 'td'])]
                
                for row in rows[1:]:
                    cells = row.find_all(['td', 'th'])
                    row_data = [cell.get_text(strip=True) for cell in cells]
                    if len(row_data) == len(headers):
                        all_data.append(dict(zip(headers, row_data)))
            
            time.sleep(self.delay)
            return all_data
            
        except Exception as e:
            print(f"Error scraping table from {url}: {e}")
            return []
    
    def scrape_product_info(self, url, selectors):
        """Scrape product information using CSS selectors"""
        try:
            response = self.session.get(url)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'html.parser')
            
            product_data = {}
            for field, selector in selectors.items():
                element = soup.select_one(selector)
                if element:
                    product_data[field] = element.get_text(strip=True)
                else:
                    product_data[field] = None
            
            time.sleep(self.delay)
            return product_data
            
        except Exception as e:
            print(f"Error scraping product from {url}: {e}")
            return {}

# Usage
scraper = WebScraper('https://example.com')

# Scrape links
links = scraper.scrape_links('https://example.com/page')

# Scrape table data
table_data = scraper.scrape_table_data('https://example.com/data')

# Scrape product info
selectors = {
    'title': 'h1.product-title',
    'price': '.price',
    'description': '.product-description',
    'rating': '.rating-value'
}
product_info = scraper.scrape_product_info('https://example.com/product', selectors)

Selenium Web Automation

python
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
import time

class WebAutomator:
    def __init__(self, headless=False):
        self.driver = None
        self.headless = headless
        self.setup_driver()
    
    def setup_driver(self):
        """Setup Chrome driver with options"""
        chrome_options = Options()
        if self.headless:
            chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-gpu')
        
        self.driver = webdriver.Chrome(options=chrome_options)
        self.driver.implicitly_wait(10)
    
    def login(self, url, username, password, username_selector, password_selector, submit_selector):
        """Automate login process"""
        try:
            self.driver.get(url)
            
            # Wait for page to load
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, username_selector))
            )
            
            # Fill username
            username_field = self.driver.find_element(By.CSS_SELECTOR, username_selector)
            username_field.clear()
            username_field.send_keys(username)
            
            # Fill password
            password_field = self.driver.find_element(By.CSS_SELECTOR, password_selector)
            password_field.clear()
            password_field.send_keys(password)
            
            # Submit form
            submit_button = self.driver.find_element(By.CSS_SELECTOR, submit_selector)
            submit_button.click()
            
            # Wait for login to complete
            time.sleep(3)
            return True
            
        except Exception as e:
            print(f"Login failed: {e}")
            return False
    
    def fill_form(self, form_data):
        """Fill form fields automatically"""
        for selector, value in form_data.items():
            try:
                element = self.driver.find_element(By.CSS_SELECTOR, selector)
                element.clear()
                element.send_keys(value)
                time.sleep(0.5)
            except Exception as e:
                print(f"Error filling field {selector}: {e}")
    
    def scrape_dynamic_content(self, url, wait_for_selector):
        """Scrape content that loads dynamically"""
        try:
            self.driver.get(url)
            
            # Wait for content to load
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, wait_for_selector))
            )
            
            # Get page source after dynamic content loads
            soup = BeautifulSoup(self.driver.page_source, 'html.parser')
            return soup
            
        except Exception as e:
            print(f"Error scraping dynamic content: {e}")
            return None
    
    def close(self):
        """Close the browser"""
        if self.driver:
            self.driver.quit()

# Usage
automator = WebAutomator(headless=True)

# Login automation
login_success = automator.login(
    'https://example.com/login',
    'username',
    'password',
    '#username',
    '#password',
    '#login-button'
)

# Form filling
form_data = {
    '#name': 'John Doe',
    '#email': 'john@example.com',
    '#phone': '123-456-7890'
}
automator.fill_form(form_data)

automator.close()

API Automation

REST API Automation

python
import requests
import json
from datetime import datetime
import time

class APIAutomator:
    def __init__(self, base_url, api_key=None):
        self.base_url = base_url
        self.session = requests.Session()
        
        if api_key:
            self.session.headers.update({
                'Authorization': f'Bearer {api_key}',
                'Content-Type': 'application/json'
            })
    
    def get_data(self, endpoint, params=None):
        """GET request with error handling"""
        try:
            response = self.session.get(f"{self.base_url}/{endpoint}", params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"GET request failed: {e}")
            return None
    
    def post_data(self, endpoint, data):
        """POST request with error handling"""
        try:
            response = self.session.post(f"{self.base_url}/{endpoint}", json=data)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"POST request failed: {e}")
            return None
    
    def update_data(self, endpoint, data):
        """PUT request with error handling"""
        try:
            response = self.session.put(f"{self.base_url}/{endpoint}", json=data)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"PUT request failed: {e}")
            return None
    
    def delete_data(self, endpoint):
        """DELETE request with error handling"""
        try:
            response = self.session.delete(f"{self.base_url}/{endpoint}")
            response.raise_for_status()
            return response.status_code == 204
        except requests.exceptions.RequestException as e:
            print(f"DELETE request failed: {e}")
            return False
    
    def batch_operations(self, operations):
        """Perform multiple API operations"""
        results = []
        for operation in operations:
            method = operation['method'].upper()
            endpoint = operation['endpoint']
            data = operation.get('data')
            
            if method == 'GET':
                result = self.get_data(endpoint, data)
            elif method == 'POST':
                result = self.post_data(endpoint, data)
            elif method == 'PUT':
                result = self.update_data(endpoint, data)
            elif method == 'DELETE':
                result = self.delete_data(endpoint)
            
            results.append({
                'operation': operation,
                'result': result,
                'timestamp': datetime.now().isoformat()
            })
            
            # Rate limiting
            time.sleep(0.5)
        
        return results

# Usage
api = APIAutomator('https://api.example.com', 'your-api-key')

# Get data
users = api.get_data('users')

# Post data
new_user = api.post_data('users', {
    'name': 'John Doe',
    'email': 'john@example.com'
})

# Batch operations
operations = [
    {'method': 'GET', 'endpoint': 'users'},
    {'method': 'POST', 'endpoint': 'users', 'data': {'name': 'Jane Doe'}},
    {'method': 'PUT', 'endpoint': 'users/1', 'data': {'name': 'John Smith'}},
    {'method': 'DELETE', 'endpoint': 'users/2'}
]
results = api.batch_operations(operations)

Email Automation

Email Sending Automation

python
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os
from datetime import datetime

class EmailAutomator:
    def __init__(self, smtp_server, smtp_port, username, password):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.session = None
    
    def connect(self):
        """Connect to SMTP server"""
        try:
            self.session = smtplib.SMTP(self.smtp_server, self.smtp_port)
            self.session.starttls()
            self.session.login(self.username, self.password)
            return True
        except Exception as e:
            print(f"SMTP connection failed: {e}")
            return False
    
    def send_email(self, to_email, subject, body, html_body=None, attachments=None):
        """Send email with optional HTML and attachments"""
        if not self.session:
            if not self.connect():
                return False
        
        try:
            msg = MIMEMultipart('alternative')
            msg['From'] = self.username
            msg['To'] = to_email
            msg['Subject'] = subject
            
            # Add text body
            text_part = MIMEText(body, 'plain')
            msg.attach(text_part)
            
            # Add HTML body if provided
            if html_body:
                html_part = MIMEText(html_body, 'html')
                msg.attach(html_part)
            
            # Add attachments
            if attachments:
                for attachment_path in attachments:
                    if os.path.isfile(attachment_path):
                        with open(attachment_path, "rb") as attachment:
                            part = MIMEBase('application', 'octet-stream')
                            part.set_payload(attachment.read())
                            encoders.encode_base64(part)
                            part.add_header(
                                'Content-Disposition',
                                f'attachment; filename= {os.path.basename(attachment_path)}'
                            )
                            msg.attach(part)
            
            # Send email
            self.session.send_message(msg)
            print(f"Email sent to {to_email}")
            return True
            
        except Exception as e:
            print(f"Failed to send email: {e}")
            return False
    
    def send_bulk_emails(self, email_list, subject_template, body_template, html_template=None):
        """Send bulk emails with templates"""
        success_count = 0
        
        for email_data in email_list:
            # Format subject and body with data
            subject = subject_template.format(**email_data)
            body = body_template.format(**email_data)
            html_body = html_template.format(**email_data) if html_template else None
            
            if self.send_email(email_data['email'], subject, body, html_body):
                success_count += 1
            
            # Rate limiting
            time.sleep(1)
        
        print(f"Sent {success_count}/{len(email_list)} emails successfully")
        return success_count
    
    def close(self):
        """Close SMTP connection"""
        if self.session:
            self.session.quit()

# Usage
email_automator = EmailAutomator(
    'smtp.gmail.com',
    587,
    'your-email@gmail.com',
    'your-app-password'
)

# Send simple email
email_automator.send_email(
    'recipient@example.com',
    'Test Email',
    'This is a test email from Python automation.'
)

# Send HTML email
html_body = """
<html>
<body>
    <h2>Welcome to Our Service!</h2>
    <p>Thank you for signing up. We're excited to have you on board.</p>
    <p>Best regards,<br>The Team</p>
</body>
</html>
"""

email_automator.send_email(
    'recipient@example.com',
    'Welcome Email',
    'Welcome to our service!',
    html_body
)

# Send bulk emails
email_list = [
    {'name': 'John Doe', 'email': 'john@example.com'},
    {'name': 'Jane Smith', 'email': 'jane@example.com'}
]

subject_template = "Hello {name}, Welcome!"
body_template = "Hi {name},\n\nWelcome to our service!"
html_template = "<h1>Hello {name}!</h1><p>Welcome to our service!</p>"

email_automator.send_bulk_emails(email_list, subject_template, body_template, html_template)

Database Automation

SQLite Automation

python
import sqlite3
import pandas as pd
from datetime import datetime

class DatabaseAutomator:
    def __init__(self, db_path):
        self.db_path = db_path
        self.connection = None
    
    def connect(self):
        """Connect to database"""
        try:
            self.connection = sqlite3.connect(self.db_path)
            self.connection.row_factory = sqlite3.Row
            return True
        except Exception as e:
            print(f"Database connection failed: {e}")
            return False
    
    def create_table(self, table_name, columns):
        """Create table with specified columns"""
        if not self.connection:
            if not self.connect():
                return False
        
        try:
            columns_str = ', '.join([f"{col['name']} {col['type']} {col.get('constraints', '')}" 
                                   for col in columns])
            query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})"
            
            cursor = self.connection.cursor()
            cursor.execute(query)
            self.connection.commit()
            print(f"Table {table_name} created successfully")
            return True
            
        except Exception as e:
            print(f"Error creating table: {e}")
            return False
    
    def insert_data(self, table_name, data):
        """Insert data into table"""
        if not self.connection:
            if not self.connect():
                return False
        
        try:
            if isinstance(data, list) and len(data) > 0:
                # Bulk insert
                columns = list(data[0].keys())
                placeholders = ', '.join(['?' for _ in columns])
                query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
                
                values = [tuple(row[col] for col in columns) for row in data]
                cursor = self.connection.cursor()
                cursor.executemany(query, values)
                self.connection.commit()
                print(f"Inserted {len(data)} records into {table_name}")
            else:
                # Single insert
                columns = list(data.keys())
                placeholders = ', '.join(['?' for _ in columns])
                query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
                
                values = tuple(data[col] for col in columns)
                cursor = self.connection.cursor()
                cursor.execute(query, values)
                self.connection.commit()
                print(f"Inserted 1 record into {table_name}")
            
            return True
            
        except Exception as e:
            print(f"Error inserting data: {e}")
            return False
    
    def query_data(self, query, params=None):
        """Execute query and return results"""
        if not self.connection:
            if not self.connect():
                return None
        
        try:
            cursor = self.connection.cursor()
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)
            
            results = cursor.fetchall()
            return [dict(row) for row in results]
            
        except Exception as e:
            print(f"Query execution failed: {e}")
            return None
    
    def export_to_csv(self, table_name, output_file):
        """Export table data to CSV"""
        try:
            query = f"SELECT * FROM {table_name}"
            data = self.query_data(query)
            
            if data:
                df = pd.DataFrame(data)
                df.to_csv(output_file, index=False)
                print(f"Exported {len(data)} records to {output_file}")
                return True
            else:
                print(f"No data found in {table_name}")
                return False
                
        except Exception as e:
            print(f"Export failed: {e}")
            return False
    
    def close(self):
        """Close database connection"""
        if self.connection:
            self.connection.close()

# Usage
db_automator = DatabaseAutomator('automation.db')

# Create table
columns = [
    {'name': 'id', 'type': 'INTEGER', 'constraints': 'PRIMARY KEY AUTOINCREMENT'},
    {'name': 'name', 'type': 'TEXT', 'constraints': 'NOT NULL'},
    {'name': 'email', 'type': 'TEXT', 'constraints': 'UNIQUE'},
    {'name': 'created_at', 'type': 'TIMESTAMP', 'constraints': 'DEFAULT CURRENT_TIMESTAMP'}
]
db_automator.create_table('users', columns)

# Insert data
user_data = [
    {'name': 'John Doe', 'email': 'john@example.com'},
    {'name': 'Jane Smith', 'email': 'jane@example.com'}
]
db_automator.insert_data('users', user_data)

# Query data
users = db_automator.query_data("SELECT * FROM users WHERE name LIKE ?", ('%John%',))
print(users)

# Export to CSV
db_automator.export_to_csv('users', 'users_export.csv')

Scheduling and Task Automation

Task Scheduling

python
import schedule
import time
from datetime import datetime
import threading

class TaskScheduler:
    def __init__(self):
        self.running = False
        self.thread = None
    
    def daily_backup(self):
        """Daily backup task"""
        print(f"Starting daily backup at {datetime.now()}")
        # Your backup logic here
        print("Daily backup completed")
    
    def weekly_report(self):
        """Weekly report generation"""
        print(f"Generating weekly report at {datetime.now()}")
        # Your report generation logic here
        print("Weekly report generated")
    
    def hourly_cleanup(self):
        """Hourly cleanup task"""
        print(f"Running hourly cleanup at {datetime.now()}")
        # Your cleanup logic here
        print("Hourly cleanup completed")
    
    def setup_schedule(self):
        """Setup scheduled tasks"""
        # Daily tasks
        schedule.every().day.at("02:00").do(self.daily_backup)
        
        # Weekly tasks
        schedule.every().monday.at("09:00").do(self.weekly_report)
        
        # Hourly tasks
        schedule.every().hour.do(self.hourly_cleanup)
        
        print("Schedule setup completed")
    
    def run_scheduler(self):
        """Run the scheduler"""
        self.running = True
        while self.running:
            schedule.run_pending()
            time.sleep(60)  # Check every minute
    
    def start(self):
        """Start the scheduler in a separate thread"""
        self.setup_schedule()
        self.thread = threading.Thread(target=self.run_scheduler)
        self.thread.daemon = True
        self.thread.start()
        print("Scheduler started")
    
    def stop(self):
        """Stop the scheduler"""
        self.running = False
        if self.thread:
            self.thread.join()
        print("Scheduler stopped")

# Usage
scheduler = TaskScheduler()
scheduler.start()

# Keep the main thread alive
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.stop()

Advanced Automation Techniques

Error Handling and Logging

python
import logging
from functools import wraps
import traceback
from datetime import datetime

class AutomationLogger:
    def __init__(self, log_file='automation.log'):
        self.logger = logging.getLogger('automation')
        self.logger.setLevel(logging.INFO)
        
        # File handler
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.INFO)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # Formatter
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
    
    def log_error(self, error, context=None):
        """Log error with context"""
        error_msg = f"Error: {str(error)}"
        if context:
            error_msg += f" | Context: {context}"
        
        self.logger.error(error_msg)
        self.logger.error(traceback.format_exc())
    
    def log_info(self, message):
        """Log info message"""
        self.logger.info(message)
    
    def log_success(self, message):
        """Log success message"""
        self.logger.info(f"SUCCESS: {message}")

def error_handler(logger):
    """Decorator for error handling"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                result = func(*args, **kwargs)
                logger.log_success(f"{func.__name__} completed successfully")
                return result
            except Exception as e:
                logger.log_error(e, f"Function: {func.__name__}")
                return None
        return wrapper
    return decorator

# Usage
automation_logger = AutomationLogger()

@error_handler(automation_logger)
def risky_operation():
    """Example of a risky operation with error handling"""
    automation_logger.log_info("Starting risky operation")
    # Your risky code here
    return "Operation completed"

# Run the operation
result = risky_operation()

Configuration Management

python
import json
import os
from typing import Dict, Any

class ConfigManager:
    def __init__(self, config_file='config.json'):
        self.config_file = config_file
        self.config = self.load_config()
    
    def load_config(self) -> Dict[str, Any]:
        """Load configuration from file"""
        if os.path.exists(self.config_file):
            try:
                with open(self.config_file, 'r') as f:
                    return json.load(f)
            except Exception as e:
                print(f"Error loading config: {e}")
                return {}
        else:
            return self.create_default_config()
    
    def create_default_config(self) -> Dict[str, Any]:
        """Create default configuration"""
        default_config = {
            'database': {
                'host': 'localhost',
                'port': 5432,
                'name': 'automation_db'
            },
            'email': {
                'smtp_server': 'smtp.gmail.com',
                'smtp_port': 587,
                'username': '',
                'password': ''
            },
            'automation': {
                'delay': 1,
                'retry_attempts': 3,
                'log_level': 'INFO'
            }
        }
        self.save_config(default_config)
        return default_config
    
    def save_config(self, config: Dict[str, Any]):
        """Save configuration to file"""
        try:
            with open(self.config_file, 'w') as f:
                json.dump(config, f, indent=4)
            print(f"Configuration saved to {self.config_file}")
        except Exception as e:
            print(f"Error saving config: {e}")
    
    def get(self, key: str, default=None):
        """Get configuration value"""
        keys = key.split('.')
        value = self.config
        
        for k in keys:
            if isinstance(value, dict) and k in value:
                value = value[k]
            else:
                return default
        
        return value
    
    def set(self, key: str, value: Any):
        """Set configuration value"""
        keys = key.split('.')
        config = self.config
        
        for k in keys[:-1]:
            if k not in config:
                config[k] = {}
            config = config[k]
        
        config[keys[-1]] = value
        self.save_config(self.config)

# Usage
config = ConfigManager()

# Get configuration values
db_host = config.get('database.host', 'localhost')
delay = config.get('automation.delay', 1)

# Set configuration values
config.set('database.host', 'new-host.com')
config.set('automation.delay', 2)

Conclusion

Python automation is a powerful skill that can save you hours of repetitive work. This guide covered the essential techniques for:

  • File Operations: Organizing, backing up, and managing files
  • Web Automation: Scraping data and automating web interactions
  • API Integration: Automating API calls and data processing
  • Email Automation: Sending bulk emails and notifications
  • Database Operations: Automating database tasks
  • Task Scheduling: Running automated tasks on schedule
  • Error Handling: Robust error handling and logging

Best Practices

  1. Start Small: Begin with simple tasks and gradually increase complexity
  2. Error Handling: Always implement proper error handling
  3. Logging: Log all operations for debugging and monitoring
  4. Testing: Test your automation scripts thoroughly
  5. Security: Be careful with sensitive data and credentials
  6. Documentation: Document your automation processes

Next Steps

  • Explore Libraries: Check out libraries like pandas, openpyxl, schedule
  • Cloud Automation: Learn about cloud automation with AWS, Azure, or GCP
  • Advanced Techniques: Study machine learning automation and AI integration
  • Monitoring: Implement monitoring and alerting for your automation

Happy automating! 🐍