造個Python輪子,實現根據Excel生成Model和資料匯入指令碼

2022-11-16 12:00:11

前言

最近遇到一個需求,有幾十個Excel,每個的欄位都不一樣,然後都差不多是第一行是表頭,後面幾千上萬的資料,需要把這些Excel中的資料全都加入某個已經上線的Django專案

這就需要每個Excel建個表,然後一個個匯入了

這樣的效率太低,不能忍

所以我造了個自動生成 Model 和匯入指令碼的輪子

思路

首先拿出 pandas,它的 DataFrame 用來處理資料很方便

pandas 載入 Excel 之後,提取表頭,我們要通過表頭來生成資料表的欄位。有些 Excel 的表頭是中文的,需要先做個轉換。

一開始我是想用翻譯API,全都翻譯成英文,不過發現免費的很慢有限額,微軟、DeepL都要申請,很麻煩。索性用個拼音轉換庫,全都轉換成拼音得了~

然後欄位的長度也要確定,或者全部用不限制長度的 TextField

權衡一下,我還是做一下欄位長度判定的邏輯,遍歷整個表,找出各個欄位最長的資料,然後再加一個偏移量,作為最大長度。

接著生成 Model 類,這裡我用 jinja2 模板語言,先把大概的模板寫好,然後根據提取出來的欄位名啥的生成。

最後生成 admin 設定和匯入指令碼,同理,也是用 jinja2 模板。

實現

簡單介紹下思路,現在開始上程式碼。

就幾行而已,Python很省程式碼~

模型

首先定義倆模型

欄位模型

class Field(object):
    def __init__(self, name: str, verbose_name: str, max_length: int = 128):
        self.name = name
        self.verbose_name = verbose_name
        self.max_length = max_length

    def __str__(self):
        return f'<Field>{self.name}:{self.verbose_name}'

    def __repr__(self):
        return self.__str__()

Model模型

為了符合Python關於變數的命名規範,snake_name 屬性是用正規表示式實現駝峰命名轉蛇形命名

class Model(object):
    def __init__(self, name: str, verbose_name: str, id_field: Field, fields: List[Field]):
        self.name = name
        self.verbose_name = verbose_name
        self.id_field = id_field
        self.fields: List[Field] = fields

    @property
    def snake_name(self):
        import re
        pattern = re.compile(r'(?<!^)(?=[A-Z])')
        name = pattern.sub('_', self.name).lower()
        return name

    def __str__(self):
        return f'<Model>{self.name}:{self.verbose_name}'

    def __repr__(self):
        return self.__str__()

程式碼模板

使用 jinja2 實現。

本身 jinja2 是 Flask、Django 之類的框架用來渲染網頁的。

不過單獨使用的效果也不錯,我的 DjangoStarter 框架也是用這個 jinja2 來自動生成 CRUD 程式碼~

Model模板

# -*- coding:utf-8 -*-
from django.db import models

class {{ model.name }}(models.Model):
    """{{ model.verbose_name }}"""
    {% for field in model.fields -%}
    {{ field.name }} = models.CharField('{{ field.verbose_name }}', default='', null=True, blank=True, max_length={{ field.max_length }})
    {% endfor %}
    class Meta:
        db_table = '{{ model.snake_name }}'
        verbose_name = '{{ model.verbose_name }}'
        verbose_name_plural = verbose_name

Admin設定模板

@admin.register({{ model.name }})
class {{ model.name }}Admin(admin.ModelAdmin):
    list_display = [{% for field in model.fields %}'{{ field.name }}', {% endfor %}]
    list_display_links = None

    def has_add_permission(self, request):
        return False

    def has_delete_permission(self, request, obj=None):
        return False

    def has_view_permission(self, request, obj=None):
        return False

資料匯入指令碼

這裡做了幾件事:

  • 使用 pandas 處理空值,填充空字串
  • 已有資料進行批次更新
  • 新資料批次插入

更新邏輯麻煩一點,因為資料庫一般都有每次最大更新數量的限制,所以我做了分批次處理,通過 update_data_once_max_lines 控制每次最多同時更新多少條資料。

def import_{{ model.snake_name }}():
    file_path = path_proc(r'{{ excel_filepath }}')

    logger.info(f'讀取檔案: {file_path}')
    xlsx = pd.ExcelFile(file_path)
    df = pd.read_excel(xlsx, 0, header={{ excel_header }})
    df.fillna('', inplace=True)

    logger.info('開始處理資料')

    id_field_list = {{ model.name }}.objects.values_list('{{ model.id_field.name }}', flat=True)
    item_list = list({{ model.name }}.objects.all())

    def get_item(id_value):
        for i in item_list:
            if i.shen_qing_ren_zheng_jian_hao_ma == id_value:
                return i
        return None

    insert_data = []
    update_data_once_max_lines = 100
    update_data_sub_set_index = 0
    update_data = [[]]
    update_fields = set()

    for index, row in df.iterrows():
        if '{{ model.id_field.verbose_name }}' not in row:
            logger.error('id_field {} is not existed'.format('{{ model.id_field.verbose_name }}'))
            continue

        if row['{{ model.id_field.verbose_name }}'] in id_field_list:
            item = get_item(row['{{ model.id_field.verbose_name }}'])
            {% for field in model.fields -%}
            if '{{ field.verbose_name }}' in row:
                if item.{{ field.name }} != row['{{ field.verbose_name }}']:
                    item.{{ field.name }} = row['{{ field.verbose_name }}']
                    update_fields.add('{{ field.name }}')
            {% endfor %}
            if len(update_data[update_data_sub_set_index]) >= update_data_once_max_lines:
                update_data_sub_set_index += 1
                update_data.append([])
            update_data[update_data_sub_set_index].append(item)
        else:
            # {% for field in model.fields -%}{{ field.verbose_name }},{%- endfor %}
            model_obj = {{ model.name }}()
            {% for field in model.fields -%}
            if '{{ field.verbose_name }}' in row:
                model_obj.{{ field.name }} = row['{{ field.verbose_name }}']
            {% endfor %}
            insert_data.append(model_obj)

    logger.info('開始批次匯入')
    {{ model.name }}.objects.bulk_create(insert_data)
    logger.info('匯入完成')

    if len(update_data[update_data_sub_set_index]) > 0:
        logger.info('開始批次更新')
        for index, update_sub in enumerate(update_data):
            logger.info(f'正在更新 {index * update_data_once_max_lines}-{(index + 1) * update_data_once_max_lines} 條資料')
            {{ model.name }}.objects.bulk_update(update_sub, list(update_fields))
        logger.info('更新完成')

主體程式碼

剩下的全是核心程式碼了

參照依賴

先把用到的庫匯入

import os
import re
from typing import List, Optional

from pypinyin import pinyin, lazy_pinyin, Style
from jinja2 import Environment, PackageLoader, FileSystemLoader

或者後面直接去我的完整程式碼裡面拿也行~

老規矩,我封裝了一個類。

構造方法需要指定 Excel 檔案地址,還有表頭的行索引。

class ExcelToModel(object):
    def __init__(self, filepath, header_index=0):
        self.filepath = filepath
        self.header_index = header_index
        self.columns = []
        self.fields: List[Field] = []

        self.base_dir = os.path.dirname(os.path.abspath(__file__))
        self.template_path = os.path.join(self.base_dir, 'templates')
        self.jinja2_env = Environment(loader=FileSystemLoader(self.template_path))

        self.load_file()

這裡面有個 self.load_file() 後面再貼。

欄位名中文轉拼音

用了 pypinyin 這個庫,感覺還不錯。

轉換後用正規表示式,去除符號,只保留英文和數位。

程式碼如下,也是放在 ExcelToModel 類裡邊。

@staticmethod
def to_pinyin(text: str) -> str:
    pattern = r'~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=「:’;、。,?》{《}】【\n\]\[ '
    text = re.sub(r"[%s]+" % pattern, "", text)
    return '_'.join(lazy_pinyin(text, style=Style.NORMAL))

載入檔案

拿出萬能的 pandas,按照前面說的思路,提取表頭轉換成欄位,並且遍歷資料確定每個欄位的最大長度,我這裡偏移值是32,即在當前資料最大長度基礎上加上32個字元。

def load_file(self):
    import pandas as pd
    xlsx = pd.ExcelFile(self.filepath)
    df = pd.read_excel(xlsx, 0, header=self.header_index)
    df.fillna('', inplace=True)
    self.columns = list(df.columns)
    for col in self.columns:
        field = Field(self.to_pinyin(col), col)
        self.fields.append(field)
        for index, row in df.iterrows():
            item_len = len(str(row[col]))
            if item_len > field.max_length:
                field.max_length = item_len + 32

        print(field.verbose_name, field.name, field.max_length)

如果覺得這樣生成表太慢,可以把確定最大長度的這塊程式碼去掉,就下面這塊程式碼

for index, row in df.iterrows():
    item_len = len(str(row[col]))
    if item_len > field.max_length:
        field.max_length = item_len + 32

手動指定最大長度或者換成不限制長度的 TextField 就行。

生成檔案

先構造個 context 然後直接用 jinja2 的 render 功能生成程式碼。

為了在匯入時判斷資料存不存在,生成程式碼時要指定 id_field_verbose_name,即Excel檔案中類似「證件號碼」、「編號」之類的列名,注意是Excel中的表頭列名。

def find_field_by_verbose_name(self, verbose_name) -> Optional[Field]:
    for field in self.fields:
        if field.verbose_name == verbose_name:
            return field
    return None

def generate_file(self, model_name: str, verbose_name: str, id_field_verbose_name: str, output_filepath: str):
    template = self.jinja2_env.get_template('output.jinja2')
    context = {
        'model': Model(
            model_name, verbose_name,
            self.find_field_by_verbose_name(id_field_verbose_name),
            self.fields
        ),
        'excel_filepath': self.filepath,
        'excel_header': self.header_index,
    }
    with open(output_filepath, 'w+', encoding='utf-8') as f:
        render_result = template.render(context)
        f.write(render_result)

使用

看程式碼。

tool = ExcelToModel('file.xlsx')
tool.generate_file('CitizenFertility', '房價與居民生育率', '證件號碼', 'output/citizen_fertility.py')

生成出來的程式碼都在一個檔案裡,請根據實際情況放到專案的各個位置。

完整程式碼

釋出到Github了

地址: https://github.com/Deali-Axy/excel_to_model

小結

目前看來完美契合需求,極大節省工作量~

實際跑起來,不得不吐槽 Python 羸弱的效能,佔記憶體還大… 湊合著用吧。也許後面有時間會優化一下~