Python实用速查表:日常任务的高效编程指南 一、引言 这份速查表源于实际需求。
我将各个部分划分为逻辑连贯的区域,这些区域通常协同工作。如此一来,可以迅速跳转到感兴趣的部分,并找到与特定任务或主题最为相关的内容。 这其中包括文件操作、API交互、电子表格处理、数学计算以及对列表和字典等数据结构的操作。此外,我还会着重介绍一些实用的库,以增强您的Python工具包,这些库在Python的常见应用领域中广泛使用。
二、使用文件 1. 读取文件 要读取文件的全部内容:
1 2 3 4 5 with open ('example.txt' , 'r' ) as file: content = file.read() print (content)
2. 写入文件 要将文本写入文件,覆盖现有内容:
1 2 3 with open ('example.txt' , 'w' ) as file: file.write('Hello, Python!' )
3. 追加到文件 要将文本添加到现有文件的末尾:
1 2 3 with open ('example.txt' , 'a' ) as file: file.write('\nAppend this line.' )
4. 将行读入列表 要将文件逐行读入列表:
1 2 3 4 with open ('example.txt' , 'r' ) as file: lines = file.readlines() print (lines)
5. 迭代文件中的每一行 要处理文件中的每一行:
1 2 3 4 with open ('example.txt' , 'r' ) as file: for line in file: print (line.strip())
6. 检查文件是否存在 要在执行文件操作之前检查文件是否存在:
1 2 3 4 5 6 import osif os.path.exists('example.txt' ): print ('File exists.' )else : print ('File does not exist.' )
7. 将列表写入文件 要将列表的每个元素写入文件中的新行:
1 2 3 4 5 6 lines = ['First line' , 'Second line' , 'Third line' ]with open ('example.txt' , 'w' ) as file: for line in lines: file.write(f'{line} \n' )
8. 对多个文件使用with块 要使用with
块同时处理多个文件:
1 2 3 4 with open ('source.txt' , 'r' ) as source, open ('destination.txt' , 'w' ) as destination: content = source.read() destination.write(content)
9. 删除文件 要安全删除文件(如果存在):
1 2 3 4 5 6 7 import osif os.path.exists('example.txt' ): os.remove('example.txt' ) print ('File deleted.' )else : print ('File does not exist.' )
10. 读取和写入二进制文件 要以二进制模式读取和写入文件(对图像、视频等有用):
1 2 3 4 5 6 with open ('image.jpg' , 'rb' ) as file: content = file.read()with open ('copy.jpg' , 'wb' ) as file: file.write(content)
三、使用简单的HTTP API 1. 基本GET请求 要使用GET请求从API端点获取数据:
1 2 3 4 5 6 import requests response = requests.get('https://api.example.com/data' ) data = response.json() print (data)
2. 带有查询参数的GET请求 要发送包含查询参数的GET请求:
1 2 3 4 5 6 7 import requests params = {'key1' : 'value1' , 'key2' : 'value2' } response = requests.get('https://api.example.com/search' , params=params) data = response.json()print (data)
3. 处理HTTP错误 要优雅地处理可能的HTTP错误:
1 2 3 4 5 6 7 8 9 10 import requests response = requests.get('https://api.example.com/data' )try : response.raise_for_status() data = response.json() print (data)except requests.exceptions.HTTPError as err: print (f'HTTP Error: {err} ' )
4. 设置请求的超时时间 要为API请求设置超时以避免无限期挂起:
1 2 3 4 5 6 7 8 import requeststry : response = requests.get('https://api.example.com/data' , timeout=5 ) data = response.json() print (data)except requests.exceptions.Timeout: print ('The request timed out' )
5. 在请求中使用标头 要在请求中包含标头(例如,用于授权):
1 2 3 4 5 6 7 import requests headers = {'Authorization' : 'Bearer YOUR_ACCESS_TOKEN' } response = requests.get('https://api.example.com/protected' , headers=headers) data = response.json()print (data)
6. 带有JSON有效负载的POST请求 要使用带有JSON负载的POST请求将数据发送到API端点:
1 2 3 4 5 6 7 8 import requests payload = {'key1' : 'value1' , 'key2' : 'value2' } headers = {'Content-Type' : 'application/json' } response = requests.post('https://api.example.com/submit' , json=payload, headers=headers)print (response.json())
7. 处理响应编码 要正确处理响应编码:
1 2 3 4 5 6 7 8 import requests response = requests.get('https://api.example.com/data' ) response.encoding = 'utf-8' data = response.textprint (data)
8. 将会话与请求一起使用 要使用session对象向同一主机发出多个请求,以提高性能:
1 2 3 4 5 6 7 8 import requestswith requests.Session() as session: session.headers.update({'Authorization' : 'Bearer YOUR_ACCESS_TOKEN' }) response = session.get('https://api.example.com/data' ) print (response.json())
9. 处理重定向 要处理或禁用请求中的重定向:
1 2 3 4 import requests response = requests.get('https://api.example.com/data' , allow_redirects=False )print (response.status_code)
10. 流式处理大型响应 要流式传输大型响应以块形式处理,而不是将其全部加载到内存中:
1 2 3 4 5 6 import requests response = requests.get('https://api.example.com/large-data' , stream=True )for chunk in response.iter_content(chunk_size=1024 ): process(chunk)
四、使用列表 1. 创建列表 要创建一个列表:
1 2 elements = ['Earth' , 'Air' , 'Fire' , 'Water' ]
2. 追加到列表 要将新元素追加到列表末尾:
1 2 elements.append('Aether' )
3. 插入到列表 要在列表中的特定位置插入元素:
1 2 elements.insert(1 , 'Spirit' )
4. 从列表中删除 要从列表中按值删除元素:
1 2 elements.remove('Earth' )
5. 从列表中弹出一个元素 要删除并返回给定索引处的元素(默认为最后一项):
1 2 last_element = elements.pop()
6. 查找元素的索引 要查找元素第一次出现的索引:
1 2 index_of_air = elements.index('Air' )
7. 列表切片 要对列表进行切片,获取子列表:
1 2 sub_elements = elements[1 :4 ]
8. 列表推导 要通过将表达式应用于现有列表的每个元素来创建新列表:
1 2 lengths = [len (element) for element in elements]
9. 对列表进行排序 要按升序对列表进行排序(就地):
10. 反转列表 要就地反转列表的元素:
五、使用字典 1. 创建词典 要创建一个新词典:
1 2 elements = {'Hydrogen' : 'H' , 'Helium' : 'He' , 'Lithium' : 'Li' }
2. 添加或更新条目 要添加新条目或更新现有条目:
1 2 elements['Carbon' ] = 'C'
3. 删除条目 要从字典中删除条目:
1 2 del elements['Lithium' ]
4. 检查Key是否存在 要检查键是否存在于字典中:
1 2 3 if 'Helium' in elements: print ('Helium is present' )
5. 迭代Key 要遍历字典中的键:
1 2 3 for element in elements: print (element)
6. 迭代值 要遍历字典中的值:
1 2 3 for symbol in elements.values(): print (symbol)
7. 迭代项目 要同时遍历字典的键和值:
1 2 3 for element, symbol in elements.items(): print (f'{element} : {symbol} ' )
8. 字典理解 要通过对可迭代对象施咒来创建新字典:
1 2 squares = {x: x**2 for x in range (5 )}
9. 合并词典 要合并两个或多个词典,形成它们条目的新联盟:
1 2 3 4 5 alchemists = {'Paracelsus' : 'Mercury' } philosophers = {'Plato' : 'Aether' } merged = {**alchemists, **philosophers}
10. 使用default获取值 要安全地检索值,为缺失的键提供默认值:
1 2 element = elements.get('Neon' , 'Unknown' )
六、使用操作系统 1. 导航文件路径 要制作和剖析路径,确保跨操作系统的兼容性:
1 2 3 4 5 6 7 import os path = os.path.join('mystic' , 'forest' , 'artifact.txt' ) directory = os.path.dirname(path) artifact_name = os.path.basename(path)
2. 列出目录内容 要揭示目录中的所有文件和子目录:
1 2 3 4 import os contents = os.listdir('enchanted_grove' )print (contents)
3. 创建目录 要在文件系统中创建新目录:
1 2 3 4 5 import os os.mkdir('alchemy_lab' ) os.makedirs('alchemy_lab/potions/elixirs' )
4. 删除文件和目录 要删除文件或目录:
1 2 3 4 5 6 7 8 import os os.remove('unnecessary_scroll.txt' ) os.rmdir('abandoned_hut' )import shutil shutil.rmtree('cursed_cavern' )
5. 执行Shell命令 要直接从Python调用Shell命令:
1 2 3 4 5 6 import subprocess result = subprocess.run(['echo' , 'Revealing the arcane' ], capture_output=True , text=True ) print (result.stdout)
6. 使用环境变量 要读取和设置环境变量:
1 2 3 4 5 import os path = os.environ.get('PATH' ) os.environ['MAGIC' ] = 'Arcane'
7. 更改当前工作目录 要将当前工作目录切换到文件系统中的另一个目录:
1 2 3 import os os.chdir('arcane_library' )
8. 路径存在和类型 要辨别路径的存在及其性质(是文件还是目录):
1 2 3 4 5 6 7 import os exists = os.path.exists('mysterious_ruins' ) is_directory = os.path.isdir('mysterious_ruins' ) is_file = os.path.isfile('ancient_manuscript.txt' )
9. 使用临时文件 要创建临时文件和目录:
1 2 3 4 5 6 7 import tempfile temp_file = tempfile.NamedTemporaryFile(delete=False )print (temp_file.name) temp_dir = tempfile.TemporaryDirectory()print (temp_dir.name)
10. 获取系统信息 要获取有关主机系统的信息,包括其名称和支持的特性:
1 2 3 4 5 6 import osimport platform os_name = os.name system_info = platform.system()
七、使用CLI(命令行界面) 1. 读取用户输入 从STDIN获取输入:
1 2 3 4 user_input = input ("Impart your wisdom: " )print (f"You shared: {user_input} " )
2. 打印到STDOUT 要将消息打印到控制台:
1 2 print ("Behold, the message of the ancients!" )
3. 格式打印 要优雅而精确地将变量嵌入到消息中:
1 2 3 4 5 name = "Merlin" age = 300 print (f"{name} , of {age} years, speaks of forgotten lore." )
4. 从STDIN读取行 从STDIN逐行修剪空格:
1 2 3 4 5 import sysfor line in sys.stdin: print (f"Echo from the void: {line.strip()} " )
5. 写信给STDERR 要向STDERR发送消息:
1 2 3 import sys sys.stderr.write("Beware! The path is fraught with peril.\n" )
6. 重定向STDOUT 要重定向STDOUT:
1 2 3 4 5 6 7 8 9 import sys original_stdout = sys.stdoutwith open ('mystic_log.txt' , 'w' ) as f: sys.stdout = f print ("This message is inscribed within the mystic_log.txt." ) sys.stdout = original_stdout
7. 重定向STDERR 要重定向STDERR:
1 2 3 4 5 import syswith open ('warnings.txt' , 'w' ) as f: sys.stderr = f print ("This warning is sealed within warnings.txt." , file=sys.stderr)
8. 提示输入密码 要提示输入密码:
1 2 3 import getpass secret_spell = getpass.getpass("Whisper the secret spell: " )
9. 命令行参数 使用和解析命令行参数:
1 2 3 4 5 import sys script, first_arg, second_arg = sys.argvprint (f"Invoked with the sacred tokens: {first_arg} and {second_arg} " )
10. 使用Argparse进行复杂的CLI交互 添加描述和选项/参数:
1 2 3 4 5 6 7 8 9 10 11 import argparse parser = argparse.ArgumentParser(description="Invoke the ancient scripts." ) parser.add_argument('spell' , help ="The spell to cast" ) parser.add_argument('--power' , type =int , help ="The power level of the spell" ) args = parser.parse_args()print (f"Casting {args.spell} with power {args.power} " )
八、使用数学运算和排列 1. 基本算术运算 要执行基本算术运算:
1 2 3 4 5 6 7 8 9 10 11 12 sum_result = 7 + 3 difference = 7 - 3 product = 7 * 3 quotient = 7 / 3 remainder = 7 % 3 power = 7 ** 3
2. 使用复数 要使用复数:
1 2 3 4 5 6 7 8 z = complex (2 , 3 ) real_part = z.real imaginary_part = z.imag conjugate = z.conjugate()
3. 数学函数 常见的数学函数:
1 2 3 4 5 6 7 import math root = math.sqrt(16 ) logarithm = math.log(100 , 10 ) sine = math.sin(math.pi / 2 )
4. 生成排列 从给定集合生成排列的简单方法:
1 2 3 4 5 6 from itertools import permutations paths = permutations([1 , 2 , 3 ])for path in paths: print (path)
5. 生成组合 生成组合的简单方法:
1 2 3 4 5 6 from itertools import combinations combos = combinations([1 , 2 , 3 , 4 ], 2 )for combo in combos: print (combo)
6. 随机数生成 要获取随机数:
1 2 3 import random num = random.randint(1 , 100 )
7. 使用分数 当需要处理分数时:
1 2 3 4 5 from fractions import Fraction f = Fraction(3 , 4 )print (f + 1 )
8. 统计函数 要获取平均值、中位数和标准差:
1 2 3 4 5 6 7 8 import statistics data = [1 , 2 , 3 , 4 ] mean = statistics.mean(data) median = statistics.median(data) std_dev = statistics.stdev(data)
9. 矩阵运算 要执行基本的矩阵运算:
1 2 3 4 5 6 7 8 import numpy as np matrix_a = np.array([[1 , 2 ], [3 , 4 ]]) matrix_b = np.array([[5 , 6 ], [7 , 8 ]]) sum_matrix = matrix_a + matrix_b product_matrix = np.dot(matrix_a, matrix_b)
10. 三角函数 要使用三角函数:
1 2 3 4 5 6 7 import math sin_value = math.sin(math.pi / 2 ) cos_value = math.cos(math.pi) radians = math.radians(180 )
九、使用日期和时间 1. 获取当前日期和时间 1 2 3 4 5 6 from datetime import datetime now = datetime.now() formatted_date = now.strftime("%Y-%m-%d %H:%M:%S" ) print (formatted_date)
2. 日期计算 1 2 3 4 5 6 7 from datetime import datetime, timedelta today = datetime.now() one_week_later = today + timedelta(days=7 ) difference = one_week_later - today
3. 时区处理 1 2 3 4 5 6 7 8 from datetime import datetimeimport pytz timezone = pytz.timezone('Asia/Shanghai' ) local_time = datetime.now(timezone) utc_time = local_time.astimezone(pytz.UTC)
4. 解析日期字符串 1 2 3 4 5 6 7 8 9 from datetime import datetime date_string = "2024-01-01 12:00:00" parsed_date = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S" ) year = parsed_date.year month = parsed_date.month day = parsed_date.day
十、使用正则表达式 1. 基本模式匹配 1 2 3 4 5 6 7 import re text = "Python programming is fun" match = re.search(r'Python' , text) if match : print ("Found Python!" )
2. 提取信息 1 2 3 4 5 6 import re text = "The price is $19.99 and quantity is 5" numbers = re.findall(r'\d+\.?\d*' , text) print (numbers)
3. 替换文本 1 2 3 4 5 6 import re text = "I love cats, cats are great" new_text = re.sub(r'cats' , 'dogs' , text) print (new_text)
4. 分割文本 1 2 3 4 5 6 import re text = "apple,banana;orange:grape" fruits = re.split(r'[,;:]' , text) print (fruits)
5. 验证格式 1 2 3 4 5 6 7 8 9 10 11 import redef is_valid_email (email ): pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' return bool (re.match (pattern, email))print (is_valid_email("user@example.com" )) print (is_valid_email("invalid.email" ))
十一、多线程与多进程 1. 多线程基础 多线程允许程序在同一进程内并发执行多个线程,每个线程独立运行且共享进程资源,适用于I/O密集型任务(如网络请求、文件读写等),可提高程序响应速度。
2. 使用threading
模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import threadingdef print_numbers (): for i in range (1 , 6 ): print (i)def print_letters (): for letter in 'abcde' : print (letter) thread1 = threading.Thread(target=print_numbers) thread2 = threading.Thread(target=print_letters) thread1.start() thread2.start() thread1.join() thread2.join()
线程同步(锁) :当多个线程访问共享资源时,可能会出现数据不一致问题,需使用锁进行同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import threading counter = 0 lock = threading.Lock()def increment (): global counter for _ in range (100000 ): lock.acquire() try : counter += 1 finally : lock.release() threads = []for _ in range (5 ): thread = threading.Thread(target=increment) threads.append(thread) thread.start()for thread in threads: thread.join()print (counter)
3. 多进程基础 多进程将任务分配到多个进程中并行执行,每个进程有独立的内存空间,适用于计算密集型任务(如数学计算、图像处理等),可充分利用多核CPU优势提高计算速度。
4. 使用multiprocessing
模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from multiprocessing import Processdef square (x ): return x ** 2 if __name__ == '__main__' : processes = [] for i in range (1 , 6 ): process = Process(target=square, args=(i,)) processes.append(process) process.start() for process in processes: process.join()
进程间通信(使用Queue
) :进程间无法直接共享数据,需使用Queue
等方式进行通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from multiprocessing import Process, Queuedef producer (queue ): for i in range (1 , 6 ): queue.put(i)def consumer (queue ): while True : if queue.empty(): break item = queue.get() print (item ** 2 )if __name__ == '__main__' : queue = Queue() p1 = Process(target=producer, args=(queue,)) p2 = Process(target=consumer, args=(queue,)) p1.start() p1.join() p2.start() p2.join()
5. 线程池与进程池 线程池和进程池可管理多个线程或进程,重复利用线程或进程执行任务,减少创建和销毁开销,提高性能。
6. 使用concurrent.futures
模块
1 2 3 4 5 6 7 8 9 10 11 12 13 from concurrent.futures import ThreadPoolExecutordef print_number (number ): print (number)with ThreadPoolExecutor(max_workers=3 ) as executor: numbers = [1 , 2 , 3 , 4 , 5 ] futures = [executor.submit(print_number, number) for number in numbers] for future in futures: future.result()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from concurrent.futures import ProcessPoolExecutordef square (x ): return x ** 2 if __name__ == '__main__' : with ProcessPoolExecutor(max_workers=2 ) as executor: numbers = [1 , 2 , 3 , 4 , 5 ] futures = [executor.submit(square, number) for number in numbers] for future in futures: print (future.result())
7. 多线程与多进程的选择
选择依据 :根据任务类型选择,I/O密集型任务优先考虑多线程,计算密集型任务优先考虑多进程。同时需考虑资源限制、任务复杂性和同步难度等因素。
注意事项 :多线程需注意线程安全问题,合理使用锁;多进程需注意进程间通信开销,避免过度创建进程导致系统资源耗尽。同时,要注意不同操作系统对多线程和多进程的支持差异。
十二、数据库操作 1. 数据库操作概述 在Python中,数据库操作是许多应用程序的重要组成部分。无论是Web开发、数据分析还是自动化脚本,与数据库交互都至关重要。Python提供了丰富的库和工具,使得连接和操作各种数据库变得相对容易。以下将介绍常见数据库的操作,包括MySQL、PostgreSQL等,并展示如何将查询结果转换为Pandas数据格式,以便进行进一步的数据分析和处理。
2. MySQL数据库操作 连接到MySQL数据库 使用mysql-connector-python
库来连接MySQL数据库。首先确保已安装该库(pip install mysql-connector-python
)。
1 2 3 4 5 6 7 8 9 10 11 12 import mysql.connector mydb = mysql.connector.connect( host="localhost" , user="your_user" , password="your_password" , database="your_database" ) mycursor = mydb.cursor()
** 插入数据** 向数据库表中插入新数据。
1 2 3 4 5 6 7 8 9 10 11 sql = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)" val = ("value1" , "value2" ) mycursor.execute(sql, val) mydb.commit()print (mycursor.rowcount, "记录插入成功。" )
查询数据 从数据库表中检索数据。
1 2 3 4 5 6 7 8 9 10 11 sql = "SELECT * FROM your_table" mycursor.execute(sql) results = mycursor.fetchall()for row in results: print (row)
更新数据 修改数据库表中已存在的数据。
1 2 3 4 5 6 7 8 9 10 11 sql = "UPDATE your_table SET column1 = %s WHERE column2 = %s" val = ("new_value" , "condition_value" ) mycursor.execute(sql, val) mydb.commit()print (mycursor.rowcount, "记录被更新。" )
删除数据 从数据库表中删除数据。
1 2 3 4 5 6 7 8 9 10 11 sql = "DELETE FROM your_table WHERE condition_column = %s" val = ("condition_value" ,) mycursor.execute(sql, val) mydb.commit()print (mycursor.rowcount, "记录被删除。" )
3. PostgreSQL数据库操作 连接到PostgreSQL数据库 使用psycopg2
库连接PostgreSQL数据库(pip install psycopg2
)。
1 2 3 4 5 6 7 8 9 10 11 12 import psycopg2 conn = psycopg2.connect( dbname="your_database" , user="your_user" , password="your_password" , host="your_host" ) cur = conn.cursor()
插入数据 向PostgreSQL数据库表插入数据。
1 2 3 4 5 6 7 8 9 10 11 sql = "INSERT INTO your_table (column1, column2) VALUES (%s, %s)" val = ("value1" , "value2" ) cur.execute(sql, val) conn.commit()print (cur.rowcount, "记录插入成功。" )
查询数据 从PostgreSQL数据库表中检索数据。
1 2 3 4 5 6 7 8 9 10 11 sql = "SELECT * FROM your_table" cur.execute(sql) results = cur.fetchall()for row in results: print (row)
更新数据 更新PostgreSQL数据库表中的数据。
1 2 3 4 5 6 7 8 9 10 11 sql = "UPDATE your_table SET column1 = %s WHERE column2 = %s" val = ("new_value" , "condition_value" ) cur.execute(sql, val) conn.commit()print (cur.rowcount, "记录被更新。" )
删除数据 从PostgreSQL数据库表中删除数据。
1 2 3 4 5 6 7 8 9 10 11 sql = "DELETE FROM your_table WHERE condition_column = %s" val = ("condition_value" ,) cur.execute(sql, val) conn.commit()print (cur.rowcount, "记录被删除。" )
4. 将数据库查询结果转换为Pandas格式 pandas
是Python中强大的数据处理库,将数据库查询结果转换为pandas
的DataFrame
格式,能方便地进行数据分析和处理。
对于MySQL查询结果 假设已完成上述MySQL查询操作并获取了results
。
1 2 3 4 5 import pandas as pd df = pd.DataFrame(results, columns=[desc[0 ] for desc in mycursor.description])print (df)
对于PostgreSQL查询结果 假设已完成上述PostgreSQL查询操作并获取了results
。
1 2 3 4 5 import pandas as pd df = pd.DataFrame(results, columns=[desc[0 ] for desc in cur.description])print (df)
通过以上步骤,可在Python中实现对MySQL和PostgreSQL数据库的基本操作,并将查询结果转换为pandas
格式,便于后续的数据处理工作。在实际应用中,根据具体需求构建更复杂的数据库操作逻辑。
十三、自动化脚本与任务调度 1. 自动化脚本基础 自动化脚本能够自动执行重复性任务,减少人工干预,提高工作效率。在Python中,可通过系统命令执行、文件操作、数据处理等功能实现各种自动化任务场景,如定期备份文件、批量处理数据、自动化测试等。
2. 使用subprocess
模块执行外部命令 subprocess
模块允许在Python脚本中创建新进程并与之通信,从而执行系统命令或其他外部程序。
执行简单命令 执行一个简单的系统命令,如列出当前目录下的文件列表。
1 2 3 4 5 import subprocess result = subprocess.run(['ls' , '-l' ], capture_output=True , text=True )print (result.stdout)
执行带参数命令 执行一个带参数的命令,如使用grep
在文件中查找特定字符串。
1 2 3 4 5 6 7 8 9 import subprocess command = ['grep' , 'python' , 'example.txt' ] result = subprocess.run(command, capture_output=True , text=True )if result.returncode == 0 : print (result.stdout)else : print (f"未找到匹配项: {result.stderr} " )
3. 任务调度基础 任务调度用于在特定时间或按照一定时间间隔自动执行任务。Python中有多种任务调度库可供选择,可根据任务需求和系统环境进行选择。
4. 使用APScheduler
库进行任务调度 APScheduler
是一个强大的Python任务调度库,支持多种调度方式,如定时调度、间隔调度和一次性调度。
安装APScheduler
使用pip install apscheduler
安装该库。
定时执行任务 创建一个任务,每天凌晨2点执行备份数据的函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from apscheduler.schedulers.background import BackgroundSchedulerimport timedef backup_data (): print ("正在执行数据备份..." ) scheduler = BackgroundScheduler() scheduler.add_job(backup_data, 'cron' , hour=2 ) scheduler.start()try : while True : time.sleep(1 )except (KeyboardInterrupt, SystemExit): scheduler.shutdown()
间隔执行任务 创建一个任务,每隔5分钟检查一次系统状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from apscheduler.schedulers.background import BackgroundSchedulerimport psutildef check_system_status (): cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent print (f"CPU使用率: {cpu_usage} %,内存使用率: {memory_usage} %" ) scheduler = BackgroundScheduler() scheduler.add_job(check_system_status, 'interval' , minutes=5 ) scheduler.start()try : while True : time.sleep(1 )except (KeyboardInterrupt, SystemExit): scheduler.shutdown()
十四、图像处理 1. 图像处理概述 Python提供了多个强大的库用于图像处理,其中Pillow
(Python Imaging Library的分支)是常用的库之一。
2. 图像基本操作 安装Pillow
库 使用pip install pillow
命令安装Pillow
库。
打开和显示图像 使用Image.open()
函数打开图像文件,并可使用show()
方法显示图像(此方法会调用系统默认的图像查看器)。
1 2 3 4 5 6 7 from PIL import Image image = Image.open ('image.jpg' ) image.show()
获取图像信息 获取图像的基本信息,如尺寸、格式、模式等。
1 2 3 4 5 6 7 8 9 10 11 12 13 from PIL import Image image = Image.open ('image.jpg' ) width, height = image.sizeprint (f"图像尺寸: {width} x {height} " )print (f"图像格式: {image.format } " )print (f"图像模式: {image.mode} " )
保存图像 将处理后的图像保存为不同格式或使用不同文件名保存。
1 2 3 4 5 6 7 8 9 from PIL import Image image = Image.open ('image.jpg' ) image.save('image.png' ) image.save('new_image.jpg' )
3. 图像裁剪与缩放 ** 图像裁剪** 指定裁剪区域的坐标(左上角和右下角坐标),使用crop()
方法裁剪图像。
1 2 3 4 5 6 7 8 9 10 11 12 13 from PIL import Image image = Image.open ('image.jpg' ) left = 100 top = 100 right = 400 bottom = 400 cropped_image = image.crop((left, top, right, bottom)) cropped_image.show()
** 图像缩放** 使用resize()
方法调整图像大小,指定新的尺寸(宽度和高度)。
1 2 3 4 5 6 7 8 9 10 11 from PIL import Image image = Image.open ('image.jpg' ) new_width = 200 new_height = 200 resized_image = image.resize((new_width, new_height)) resized_image.show()
4. 图像滤镜与特效 Pillow
库还提供了一些内置的滤镜和特效,可通过ImageFilter
模块应用到图像上。模糊滤镜 应用模糊滤镜到图像上,使图像变得模糊。
1 2 3 4 5 6 7 8 9 from PIL import Image, ImageFilter image = Image.open ('image.jpg' ) blurred_image = image.filter (ImageFilter.BLUR) blurred_image.show()
边缘增强滤镜 增强图像边缘,突出图像的轮廓。
1 2 3 4 5 6 7 8 9 from PIL import Image, ImageFilter image = Image.open ('image.jpg' ) edge_enhanced_image = image.filter (ImageFilter.EDGE_ENHANCE) edge_enhanced_image.show()
5. 图像合成与混合 可将多个图像进行合成或混合,创造出更复杂的图像效果。图像合成 将一个图像粘贴到另一个图像上,指定粘贴位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from PIL import Image background = Image.open ('background.jpg' ) foreground = Image.open ('foreground.jpg' ) foreground = foreground.resize((background.width, background.height)) background.paste(foreground, (0 , 0 )) background.show()
图像混合 使用blend()
方法将两个图像按照一定比例混合。
1 2 3 4 5 6 7 8 9 10 from PIL import Image image1 = Image.open ('image1.jpg' ) image2 = Image.open ('image2.jpg' ) mixed_image = Image.blend(image1, image2, alpha=0.5 ) mixed_image.show()
十五、文本处理 1. 大批量文件打开与处理 处理大批量文件时,高效的文件读取和管理策略能够避免内存瓶颈,确保程序稳定运行。
逐行读取文件 使用with
语句打开文件,逐行读取内容,这种方式能有效控制内存使用,适用于大型文件的处理。
1 2 3 4 5 6 7 8 9 def process_large_file (file_path ): with open (file_path, 'r' ) as file: for line in file: processed_line = line.strip().lower() print (processed_line) process_large_file('large_file.txt' )
批量处理多个文件 通过遍历文件列表,依次打开并处理每个文件,实现对大批量文件的批量操作。
1 2 3 4 5 6 7 8 9 def process_files (file_paths ): for file_path in file_paths: with open (file_path, 'r' ) as file: for line in file: print (line.strip()) file_paths = ['file1.txt' , 'file2.txt' , 'file3.txt' ] process_files(file_paths)
2. 字符编码处理 字符编码是文本处理中的关键环节,正确处理编码问题能确保文本数据的准确解读和转换。
常见字符编码
UTF-8 :一种广泛应用于互联网的可变长度字符编码,可表示几乎所有字符,具有良好的兼容性和通用性。
GBK/GB2312 :适用于中文环境的编码标准,GBK是GB2312的扩展,支持更多汉字,常用于处理中文文本。
ASCII :最基础的字符编码,仅包含英文字母、数字和一些常用符号,适用于纯英文文本或简单的控制字符场景。
编码和解码操作 使用encode()
方法将字符串编码为字节串,decode()
方法将字节串解码为字符串,需指定正确的编码格式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 text = "你好,世界!" encoded_text_utf8 = text.encode('utf-8' )print (encoded_text_utf8) decoded_text_utf8 = encoded_text_utf8.decode('utf-8' )print (decoded_text_utf8) encoded_text_gbk = text.encode('gbk' )print (encoded_text_gbk) decoded_text_gbk = encoded_text_gbk.decode('gbk' )print (decoded_text_gbk)
编码转换 在不同编码之间进行转换,先将字节串解码为字符串,再编码为目标编码格式。
1 2 3 4 text_utf8 = "你好,世界!" .encode('utf-8' ) text_gbk = text_utf8.decode('utf-8' ).encode('gbk' )print (text_gbk)
处理编码错误 在解码过程中遇到编码错误时,可通过errors
参数指定处理方式,如ignore
忽略错误字节或replace
用特定字符替换。
1 2 3 4 encoded_text = b'\xe4\xbd\xa0\xe5\xa5\xbd\xff\xfe\xe4\xb8\x96\xe7\x95\x8c\xef\xbc\x81' decoded_text = encoded_text.decode('utf-8' , errors='replace' )print (decoded_text)
3. 常用正则表达式应用 正则表达式是文本处理的强大工具,能够灵活地匹配、查找、替换和验证文本中的模式。
匹配数字 使用\d
匹配数字字符,+
表示匹配一个或多个数字,可用于提取文本中的数字信息。
1 2 3 4 5 import re text = "订单号为12345,数量是5个" matches = re.findall(r'\d+' , text)print (matches)
匹配电子邮件地址 运用复杂的正则表达式模式匹配电子邮件地址,尽管无法涵盖所有可能的合法格式,但能处理常见情况。
1 2 3 4 5 6 7 import re text = "我的邮箱是user@example.com,请联系我。" pattern = r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+' match = re.search(pattern, text)if match : print (match .group())
匹配URL 使用正则表达式匹配URL,提取网页链接等信息。
1 2 3 4 5 6 7 import re text = "这是一个链接:https://www.example.com/page" pattern = r'https?://[^\s]+' match = re.search(pattern, text)if match : print (match .group())
替换字符串 利用sub()
函数根据正则表达式模式替换文本中的部分内容。
1 2 3 4 5 import re text = "今天天气真好,天气真好啊!" new_text = re.sub(r'天气真好' , '阳光明媚' , text)print (new_text)
分割字符串 使用split()
函数结合正则表达式模式按指定分隔符分割字符串。
1 2 3 4 5 import re text = "苹果,香蕉;橙子:葡萄" fruits = re.split(r'[,;:]' , text)print (fruits)
十六、日志记录与调试 1. 日志记录与调试的重要性 常用的日志库有logging、loguru和structlog
2. 使用logging
模块记录日志 基本配置与使用 logging
模块是Python标准库中用于记录日志的强大工具。其基本配置包括设置日志级别、指定日志格式以及确定输出目的地。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import logging logging.basicConfig(level=logging.DEBUG, format ='%(asctime)s %(name)s %(levelname)s %(message)s' , handlers=[ logging.FileHandler('app.log' ), logging.StreamHandler() ]) logging.debug('这是一条DEBUG级别的日志,用于调试程序细节。' ) logging.info('程序正常运行,执行到此处。' ) logging.warning('出现了一个可能影响程序运行的情况。' ) logging.error('发生错误,需要关注并解决。' ) logging.critical('严重错误,程序可能无法继续正常运行。' )
日志级别与应用场景
DEBUG :用于开发阶段,详细记录程序执行过程中的变量值、函数调用等信息,有助于深入调试。
INFO :在程序正常运行时,记录重要的状态信息、关键操作的执行等,便于了解程序整体运行流程。
WARNING :当程序出现一些不影响运行但可能引发问题的情况时,如配置参数接近临界值、资源使用较高等,发出警告。
ERROR :发生错误时,记录错误详细信息,包括错误类型、出错位置等,方便定位和修复问题。
CRITICAL :表示严重的错误,如系统资源耗尽、关键组件故障等,通常意味着程序无法继续正常工作。
3. 使用loguru
进行高级日志记录 安装与基本用法 loguru
是一个简洁且功能强大的日志记录库,可通过pip install loguru
进行安装。
1 2 3 4 5 6 7 8 9 10 from loguru import logger logger.info("这是一条使用loguru记录的信息日志。" )try : 1 / 0 except ZeroDivisionError as e: logger.error(f"发生错误: {e} " )
特色功能
自动添加上下文信息 :无需额外配置,loguru
会自动记录函数名、行号、模块名等,方便快速定位问题所在的代码位置。
灵活的日志输出配置 :可以轻松地将日志输出到控制台、文件,或者同时输出到多个目的地,并且支持日志文件的轮转和压缩。
1 2 logger.add("app_{time:YYYY-MM-DD}.log" , rotation="1 day" , retention="7 days" , compression="zip" )
美观且易读的日志格式 :默认提供了清晰、易读的日志格式,同时也支持自定义格式。
4. 使用structlog
进行结构化日志记录 安装与基本配置 structlog
专注于创建结构化的日志记录,通过pip install structlog
安装。其基本配置涉及设置处理器链来定义日志的处理流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import structlog structlog.configure( processors=[ structlog.stdlib.add_log_level, structlog.processors.TimeStamper(fmt="iso" ), structlog.processors.JSONRenderer() ] ) logger = structlog.get_logger() logger.info("这是一条结构化日志" , key="value" , another_key=123 )
结构化日志的优势
便于分析与处理 :结构化的日志格式使得日志数据易于被机器解析和处理,方便与数据分析工具和平台集成,进行日志数据的聚合、过滤和分析。
增强日志的可读性和可维护性 :通过将相关信息组织成结构化的格式,使日志内容更清晰,易于理解和维护,尤其在复杂系统中,有助于快速定位问题根源。
5. 调试技巧 使用print
语句进行简单调试 在代码中插入print
语句是最直接的调试方法,可输出变量的值、程序执行的中间结果等,帮助开发者了解程序的执行流程和状态。
1 2 3 4 5 6 7 def add_numbers (a, b ): print (f"进入add_numbers函数,参数a={a} ,b={b} " ) result = a + b print (f"计算结果为{result} " ) return result add_numbers(3 , 5 )
使用pdb
进行交互式调试 pdb
(Python Debugger)是Python内置的调试器,提供了强大的交互式调试功能。
设置断点 :在代码中需要调试的位置插入pdb.set_trace()
语句,程序运行到此处将暂停,进入调试模式。
1 2 3 4 5 6 7 8 import pdbdef divide_numbers (a, b ): pdb.set_trace() result = a / b return result divide_numbers(10 , 2 )
常用调试命令 :在pdb
调试模式下,常用命令如下。
n
(next):执行下一行代码。
s
(step):进入函数内部单步执行。
c
(continue):继续执行程序直到下一个断点或程序结束。
p <variable>
:打印变量的值。
q
(quit):退出调试模式。
十七、性能优化 1. 性能优化概述 在Python编程中,性能优化至关重要,它直接影响程序的运行速度、资源利用率和用户体验。随着数据量的增长和应用程序复杂性的增加,优化代码性能能够提高程序的响应能力、降低资源消耗,使程序在各种环境下更加高效地运行。
2. 性能分析工具 cProfile
模块cProfile
是Python标准库中的性能分析工具,可用于分析代码中各个函数的执行时间和调用次数,帮助定位性能瓶颈。
1 2 3 4 5 6 7 8 9 10 import cProfiledef slow_function (): total = 0 for i in range (1000000 ): total += i return total cProfile.run('slow_function()' )
line_profiler
库(需额外安装)line_profiler
提供了逐行分析代码性能的功能,能更精确地找出耗时的代码行。
安装:pip install line_profiler
使用:在需要分析的函数前添加@profile
装饰器,然后在命令行中使用kernprof -l -v your_script.py
运行脚本进行分析。
1 2 3 4 5 6 7 @profile def another_slow_function (): result = 0 for i in range (1000 ): for j in range (1000 ): result += i * j return result
3. 代码优化策略 算法优化 选择更高效的算法和数据结构是性能优化的关键。例如,使用字典(dict
)查找元素的时间复杂度为O(1),而在列表(list
)中查找元素的时间复杂度为O(n)。
1 2 3 4 5 6 7 8 9 10 my_list = [1 , 2 , 3 , 4 , 5 ]for i in range (len (my_list)): if my_list[i] == 3 : print ("找到元素" ) my_dict = {i: i for i in my_list}if 3 in my_dict: print ("找到元素" )
减少不必要的计算 避免重复计算,将计算结果缓存起来,在后续需要时直接使用。
1 2 3 4 5 6 7 8 import mathdef calculate_area (radius ): return math.pi * radius ** 2 area = calculate_area(5 )
优化循环
尽量减少循环中的复杂计算和函数调用,将计算结果提前计算或提取到循环外。
1 2 3 4 5 6 7 8 my_list = [1 , 2 , 3 , 4 , 5 ] new_list = []for i in my_list: new_list.append(math.sqrt(i)) new_list = [math.sqrt(i) for i in my_list]
在循环中,使用enumerate()
函数替代手动获取索引和元素,提高代码可读性和性能。
1 2 3 4 5 6 7 8 my_list = ['a' , 'b' , 'c' , 'd' ]for i in range (len (my_list)): print (i, my_list[i])for i, item in enumerate (my_list): print (i, item)
4. 使用numba
加速计算密集型任务(需额外安装) numba
是一个即时(JIT)编译器,能够将Python函数编译为机器码,显著提高计算密集型任务的执行速度。
安装:pip install numba
使用示例:
1 2 3 4 5 6 7 8 import numba@numba.jit(nopython=True ) def add_numbers (a, b ): return a + b result = add_numbers(3 , 5 )
5. 内存管理优化 及时释放不再使用的内存 手动删除不再需要的对象,释放内存空间,避免内存泄漏。
1 2 3 4 my_list = [i for i in range (1000000 )]del my_list
使用生成器代替列表 当处理大量数据时,如果不需要一次性将所有数据存储在内存中,可以使用生成器表达式,逐个生成数据项,减少内存占用。
1 2 3 4 5 6 7 my_list = [i ** 2 for i in range (1000000 )] my_generator = (i ** 2 for i in range (1000000 ))for item in my_generator: print (item)
十八、安全与加密 1. 密码哈希 密码哈希是将密码转换为不可逆的哈希值存储在数据库中的过程,以保护用户密码的安全性。
使用bcrypt
库进行密码哈希(需额外安装:pip install bcrypt
) bcrypt
是一种广泛使用的密码哈希算法,具有良好的安全性。
1 2 3 4 5 6 7 8 9 10 import bcrypt salt = bcrypt.gensalt() password = "mysecretpassword" .encode('utf-8' ) hashed_password = bcrypt.hashpw(password, salt)print (hashed_password)
验证密码 在用户登录时,验证输入的密码与存储的哈希值是否匹配。
1 2 3 4 5 entered_password = "mysecretpassword" .encode('utf-8' )if bcrypt.checkpw(entered_password, hashed_password): print ("密码正确" )else : print ("密码错误" )
2. 数据加密与解密 对敏感数据进行加密,确保数据在传输和存储过程中的保密性。
使用cryptography
库进行对称加密(需额外安装:pip install cryptography
) 对称加密使用相同的密钥进行加密和解密。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from cryptography.fernet import Fernet key = Fernet.generate_key() cipher_suite = Fernet(key) data = "敏感信息" .encode('utf-8' ) encrypted_data = cipher_suite.encrypt(data)print (encrypted_data) decrypted_data = cipher_suite.decrypt(encrypted_data)print (decrypted_data.decode('utf-8' ))
3. 安全随机数生成 在密码重置令牌、加密密钥生成等场景中,需要使用安全的随机数。
使用secrets
模块生成安全随机数 secrets
模块提供了生成密码学安全随机数的功能。
1 2 3 4 5 6 7 8 9 import secrets token = secrets.randbelow(1000000 )print (token) key = secrets.token_hex(16 )print (key)
4. 防止SQL注入 在与数据库交互时,防止SQL注入攻击至关重要。
使用参数化查询 在Python中,使用数据库驱动提供的参数化查询功能,避免将用户输入直接嵌入SQL语句中。例如,使用psycopg2
库连接PostgreSQL数据库时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import psycopg2 conn = psycopg2.connect( dbname="your_database" , user="your_user" , password="your_password" , host="your_host" ) cur = conn.cursor() username = "user1" password = "password1" cur.execute("INSERT INTO users (username, password) VALUES (%s, %s)" , (username, password)) conn.commit() cur.close() conn.close()
5. 防止跨站脚本攻击(XSS) 在Web应用程序中,防止XSS攻击以保护用户免受恶意脚本的侵害。
对用户输入进行过滤和转义 在将用户输入输出到HTML页面时,使用合适的模板引擎或手动进行HTML转义,确保用户输入不会被解释为脚本。例如,在使用Flask框架时:
1 2 3 4 5 6 7 8 9 10 11 12 13 from flask import Flask, request, render_template_string app = Flask(__name__)@app.route('/' ) def index (): user_input = request.args.get('input' ) escaped_input = escape(user_input) return render_template_string("<p>{{ escaped_input }}</p>" , escaped_input=escaped_input)if __name__ == '__main__' : app.run()
6. 安全通信(HTTPS) 确保Web应用程序与客户端之间的通信安全,防止数据在传输过程中被窃取或篡改。
使用ssl
模块创建HTTPS服务器(简单示例) 在创建网络服务器时,使用ssl
模块配置SSL/TLS证书,实现HTTPS通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import socketimport ssl server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(('localhost' , 8000 )) server_socket.listen(5 ) context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_cert_chain(certfile='server.crt' , keyfile='server.key' ) ssl_socket = context.wrap_socket(server_socket, server_side=True )while True : client_socket, client_address = ssl_socket.accept() request = client_socket.recv(1024 ).decode('utf-8' ) print (request) response = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nHello, World!" client_socket.send(response.encode('utf-8' )) client_socket.close()