trafilatura 網頁解析原理分析

2023-11-01 06:01:34

trafilatura 介紹

Trafilatura是一個Python包和命令列工具,用於收集網路上的文字。其主要應用場景包括網路爬蟲下載和網頁解析等。

今天我們不討論爬蟲和抓取,主要看他的資料解析是如何做的。

extract初體驗

from trafilatura import fetch_url, extract
url = 'https://haokan.baidu.com/v?pd=wisenatural&vid=292842333147844218'
downloaded = fetch_url(url)
result = extract(downloaded, output_format="json")
print(result)

結果:

{"title": "日本東電公佈核汙水排海瞬間:核對程式後啟動,有工作人員抽檢濃度", "author": null, "hostname": "baidu.com", "date": "2023-10-30", "fingerprint": "ffffffffffffffff", "id": null, "license": null, "comments": "", "raw_text": "日本東電公佈核汙水排海瞬間:核對程式後啟動,有工作人員抽檢濃度,國際,國際社會,好看視訊下載使用者端創作中心訊息上傳視訊61萬次播放 | 釋出時間:2023年8月25日01.3萬收藏日本東電公佈核汙水排海瞬間:核對程式後啟動,有工作人員抽檢濃度接下來播放猜你喜歡", "text": "日本東電公佈核汙水排海瞬間:核對程式後啟動,有工作人員抽檢濃度,國際,國際社會,好看視訊下載使用者端創作中心訊息上傳視訊61萬次播放 | 釋出時間:2023年8月25日01.3萬收藏日本東電公佈核汙水排海瞬間:核對程式後啟動,有工作人員抽檢濃度接下來播放猜你喜歡", "language": null, "image": "https://f7.baidu.com/it/u=3372340810,1415940711&fm=222&app=106&f=JPEG@s_0,w_800,h_1000,q_80,f_auto", "pagetype": "video", "source": "https://haokan.baidu.com/v?vid=292842333147844218&tab=recommend", "source-hostname": "haokan.baidu.com", "excerpt": "日本東電公佈核汙水排海瞬間:核對程式後啟動,有工作人員抽檢濃度,本視訊由青蜂俠Bee提供,607479次播放,好看視訊是由百度團隊打造的集內涵和顏值於一身的專業短視訊聚合平臺", "categories": "", "tags": "國際,國際社會,科普資料,科普諮詢,科普電影,科普電視劇,科普綜藝,科普話題,科普貼文,科普mv,科普視訊,科普線上,科普下載,科普觀看,科普直播,資料,諮詢,電影,電視劇,綜藝,話題,貼文,mv,視訊,線上,下載,觀看,直播,科普,國際社會,科學,日本東電,核汙水"}

extract 分析

extract 函數定義如下,預設output_format為txt,只提取正文


def extract(filecontent, url=None, record_id=None, no_fallback=False,
            favor_precision=False, favor_recall=False,
            include_comments=True, output_format='txt',
            tei_validation=False, target_language=None,
            include_tables=True, include_images=False, include_formatting=False,
            include_links=False, deduplicate=False,
            date_extraction_params=None,
            only_with_metadata=False, with_metadata=False,
            max_tree_size=None, url_blacklist=None, author_blacklist=None,
            settingsfile=None, config=DEFAULT_CONFIG,
            **kwargs):
    """Main function exposed by the package:
       Wrapper for text extraction and conversion to chosen output format.

extract裡,主要是呼叫bare_extraction

  • 首先用lxml載入tree = load_html(filecontent)
  • 然後check_html_lang, 如果設定了target_language, 但網頁不匹配會返回錯誤

meta解析

  • 接著解析extract_metadata meta資訊解析,從header裡解析內容
    • 首先,examine_meta, 先嚐試extract_opengraph,有的網站符合Search meta tags following the OpenGraph guidelines (https://ogp.me/) 規範
    • 如果不符合OpenGraph規範,則從meta裡提取 extract_meta_json,這裡有很多meta設定,比如OG_AUTHOR = {'og:author', 'og:article:author'}, 當meta裡有匹配的規則時,會填充到meta中
    • title 識別失敗的,從H1 和 設定的xpath獲取
title_xpaths = [
    '//*[(self::h1 or self::h2)][contains(@class, "post-title") or contains(@class, "entry-title") or contains(@class, "headline") or contains(@id, "headline") or contains(@itemprop, "headline") or contains(@class, "post__title") or contains(@class, "article-title")]',
    '//*[@class="entry-title" or @class="post-title"]',
    '//*[(self::h1 or self::h2 or self::h3)][contains(@class, "title") or contains(@id, "title")]',
]
  • author 識別失敗的,從設定的xpath獲取
author_xpaths = [
    '//*[(self::a or self::address or self::div or self::link or self::p or self::span or self::strong)][@rel="author" or @id="author" or @class="author" or @itemprop="author name" or rel="me" or contains(@class, "author-name") or contains(@class, "AuthorName") or contains(@class, "authorName") or contains(@class, "author name")]|//author', # specific and almost specific
    '//*[(self::a or self::div or self::h3 or self::h4 or self::p or self::span)][contains(@class, "author") or contains(@id, "author") or contains(@itemprop, "author") or @class="byline" or contains(@id, "zuozhe") or contains(@class, "zuozhe") or contains(@id, "bianji") or contains(@class, "bianji") or contains(@id, "xiaobian") or contains(@class, "xiaobian") or contains(@class, "submitted-by") or contains(@class, "posted-by") or @class="username" or @class="BBL" or contains(@class, "journalist-name")]', # almost generic and generic, last ones not common
    '//*[contains(translate(@id, "A", "a"), "author") or contains(translate(@class, "A", "a"), "author") or contains(@class, "screenname") or contains(@data-component, "Byline") or contains(@itemprop, "author") or contains(@class, "writer") or contains(translate(@class, "B", "b"), "byline")]', # last resort: any element
]
  • image 識別失敗的,從設定的xpath獲取
    for elem in tree.xpath('.//head/meta[@property="og:image" or @property="og:image:url"][@content]'):
        return elem.get('content')

    for elem in tree.xpath('.//head/meta[@property="twitter:image" or @property="twitter:image:src"][@content]'):
        return elem.get('content')
  • sitename 識別失敗的,會從title去識別, examine_title_element, 靠正則去匹配HTMLTITLE_REGEX = re.compile(r'^(.+)?\s+[–•·—|⁄*⋆~‹«<›»>:-]\s+(.+)$') # part without dots?, 這個對中文網頁好像不太行
  • 其他的還同步識別了tags,就是關鍵詞

正文識別

正文識別,設定options

    # regroup extraction options
    options = Extractor(config, no_fallback, favor_precision, favor_recall,
                        include_comments, include_formatting, include_links,
                        include_images, include_tables, deduplicate,
                        target_language)

然後backup tree 和清理tree

    # backup (or not) for further processing
    tree_backup_1 = deepcopy(tree) if no_fallback is False else None
    tree_backup_2 = deepcopy(tree)

    # clean + use LXML cleaner
    cleaned_tree = tree_cleaning(tree, options)
    cleaned_tree_backup = deepcopy(cleaned_tree)

    # convert tags, the rest does not work without conversion
    cleaned_tree = convert_tags(cleaned_tree, options, url or document.url)

識別評論:

    # comments first, then remove
    if include_comments is True:
        commentsbody, temp_comments, len_comments, cleaned_tree = extract_comments(cleaned_tree, options)
    else:
        commentsbody, temp_comments, len_comments = None, '', 0

提升精度, 將一些unwanted_nodes清理掉:

        if favor_precision is True:
            cleaned_tree = prune_unwanted_nodes(cleaned_tree, REMOVE_COMMENTS_XPATH)

這裡的REMOVE_COMMENTS_XPATH, 主要是一些常見的comment

REMOVE_COMMENTS_XPATH = [
    """.//*[(self::div or self::list or self::section)][
    starts-with(translate(@id, "C","c"), 'comment') or
    starts-with(translate(@class, "C","c"), 'comment') or
    contains(@class, 'article-comments') or contains(@class, 'post-comments')
    or starts-with(@id, 'comol') or starts-with(@id, 'disqus_thread')
    or starts-with(@id, 'dsq-comments')
    ]""",
]

然後是正餐,提取正文

# extract content
postbody, temp_text, len_text = extract_content(cleaned_tree, options)

主要原理是使用一組XPath表示式找到頁面的主要內容,然後提取相關元素,並去除不需要的部分:

  • 定義正文潛在的候選標籤potential_tags
TAG_CATALOG = frozenset(['blockquote', 'code', 'del', 'head', 'hi', 'lb', 'list', 'p', 'pre', 'quote'])

potential_tags = set(TAG_CATALOG)
if options.tables is True:
    potential_tags.update(['table', 'td', 'th', 'tr'])
if options.images is True:
    potential_tags.add('graphic')
if options.links is True:
    potential_tags.add('ref')

然後,從設定的xpath裡,去提取正文,來看看定義的xpath,真是大力出奇跡

BODY_XPATH = [
    '''.//*[(self::article or self::div or self::main or self::section)][
    @class="post" or @class="entry" or
    contains(@class, "post-text") or contains(@class, "post_text") or
    contains(@class, "post-body") or contains(@class, "post-entry") or contains(@class, "postentry") or
    contains(@class, "post-content") or contains(@class, "post_content") or
    contains(@class, "postcontent") or contains(@class, "postContent") or
    contains(@class, "article-text") or contains(@class, "articletext") or contains(@class, "articleText")
    or contains(@id, "entry-content") or
    contains(@class, "entry-content") or contains(@id, "article-content") or
    contains(@class, "article-content") or contains(@id, "article__content") or
    contains(@class, "article__content") or contains(@id, "article-body") or
    contains(@class, "article-body") or contains(@id, "article__body") or
    contains(@class, "article__body") or @itemprop="articleBody" or
    contains(translate(@id, "B", "b"), "articlebody") or contains(translate(@class, "B", "b"), "articleBody")
    or @id="articleContent" or contains(@class, "ArticleContent") or
    contains(@class, "page-content") or contains(@class, "text-content") or
    contains(@id, "body-text") or contains(@class, "body-text") or
    contains(@class, "article__container") or contains(@id, "art-content") or contains(@class, "art-content")][1]''',
    # (…)[1] = first occurrence
    '(.//article)[1]',
    """(.//*[(self::article or self::div or self::main or self::section)][
    contains(@class, 'post-bodycopy') or
    contains(@class, 'storycontent') or contains(@class, 'story-content') or
    @class='postarea' or @class='art-postcontent' or
    contains(@class, 'theme-content') or contains(@class, 'blog-content') or
    contains(@class, 'section-content') or contains(@class, 'single-content') or
    contains(@class, 'single-post') or
    contains(@class, 'main-column') or contains(@class, 'wpb_text_column') or
    starts-with(@id, 'primary') or starts-with(@class, 'article ') or @class="text" or
    @id="article" or @class="cell" or @id="story" or @class="story" or
    contains(@class, "story-body") or contains(@class, "field-body") or
    contains(translate(@class, "FULTEX","fultex"), "fulltext")
    or @role='article'])[1]""",
    '''(.//*[(self::article or self::div or self::main or self::section)][
    contains(@id, "content-main") or contains(@class, "content-main") or contains(@class, "content_main") or
    contains(@id, "content-body") or contains(@class, "content-body") or contains(@id, "contentBody")
    or contains(@class, "content__body") or contains(translate(@id, "CM","cm"), "main-content") or contains(translate(@class, "CM","cm"), "main-content")
    or contains(translate(@class, "CP","cp"), "page-content") or
    @id="content" or @class="content"])[1]''',
    '(.//*[(self::article or self::div or self::section)][starts-with(@class, "main") or starts-with(@id, "main") or starts-with(@role, "main")])[1]|(.//main)[1]',
]

然後解析簡單了,依次遍歷:

    for expr in BODY_XPATH:
        # select tree if the expression has been found
        try:
            subtree = tree.xpath(expr)[0]
        except IndexError:
            continue

對於匹配上的,開始細節處理:


    # prune the subtree
    subtree = prune_unwanted_sections(subtree, potential_tags, options)
    # second pass?
    # subtree = delete_by_link_density(subtree, 'list', backtracking=False, favor_precision=options.precision)
    if 'table' in potential_tags or options.precision is True:
        for elem in subtree.iter('table'):
            if link_density_test_tables(elem) is True:
                elem.getparent().remove(elem)
    # skip if empty tree
    if len(subtree) == 0:
        continue
    # no paragraphs containing text, or not enough
    ptest = subtree.xpath('//p//text()')
    if options.recall is True:
        factor = 5
    elif options.precision is True:
        factor = 1
    else:
        factor = 3
    if not ptest or len(''.join(ptest)) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE') * factor:
        potential_tags.add('div')
    # polish list of potential tags
    if 'ref' not in potential_tags:
        strip_tags(subtree, 'ref')
    if 'span' not in potential_tags:
        strip_tags(subtree, 'span')
    LOGGER.debug(sorted(potential_tags))
    # proper extraction
    subelems = subtree.xpath('.//*')
    # e.g. only lb-elems in a div
    if {e.tag for e in subelems} == {'lb'}:
        subelems = [subtree]
    # extract content
    result_body.extend(filter(lambda x: x is not None, (handle_textelem(e, potential_tags, options) for e in subelems)))
    # remove trailing titles
    while len(result_body) > 0 and (result_body[-1].tag in NOT_AT_THE_END):
        result_body[-1].getparent().remove(result_body[-1])
    # exit the loop if the result has children
    if len(result_body) > 1:
        LOGGER.debug(expr)
        break
  • 對子樹進行修剪,刪除不需要的部分。
  • 如果potential_tags中包含'table'或者選項中設定了優先精度precision,那麼遍歷子樹中的所有表格元素,如果裡面連結過多,會剔除這個表格
  • 如果子樹為空,跳過當前回圈
  • 如果沒有包含文字的段落,或者段落數量不足,將'div'新增到候選標籤集合中
  • 如果span和ref不在候選標籤,去除'ref'和'span'標籤
  • 最主要的提取內容,handle_textelem, 如果提取到結果,則退出(這裡可能有問題,比如後面的xpath也能匹配到內容)

def handle_textelem(element, potential_tags, options):
    '''Process text element and determine how to deal with its content'''
    new_element = None
    # bypass: nested elements
    if element.tag == 'list':
        new_element = handle_lists(element, options)
    elif element.tag in CODES_QUOTES:
        new_element = handle_quotes(element, options)
    elif element.tag == 'head':
        new_element = handle_titles(element, options)
    elif element.tag == 'p':
        new_element = handle_paragraphs(element, potential_tags, options)
    elif element.tag == 'lb':
        if text_chars_test(element.tail) is True:
            element = process_node(element, options)
            if element is not None:
                new_element = Element('p')
                new_element.text = element.tail
    elif element.tag in FORMATTING:
        new_element = handle_formatting(element, options)  # process_node(element, options)
    elif element.tag == 'table' and 'table' in potential_tags:
        new_element = handle_table(element, potential_tags, options)
    elif element.tag == 'graphic' and 'graphic' in potential_tags:
        new_element = handle_image(element)
    else:
        # other elements (div, ??, ??)
        new_element = handle_other_elements(element, potential_tags, options)
    return new_element

該函數,根據element.tag的值,呼叫不同的處理常式來處理不同型別的元素。例如,如果element.tag等於'list',則呼叫handle_lists函數;如果element.tag在CODES_QUOTES中,則呼叫handle_quotes函數,依此類推。

我們看一個處理p標籤的:


def handle_paragraphs(element, potential_tags, options):
    '''Process paragraphs (p) elements along with their children,
       trim and clean the content'''
    element.attrib.clear()
    # strip_tags(element, 'p') # change in precision due to spaces?
    # no children
    if len(element) == 0:
        processed_element = process_node(element, options)
        if processed_element is not None:
            return processed_element
        return None
    # children
    processed_element = Element(element.tag)
    for child in element.iter('*'):
        if child.tag not in potential_tags and child.tag != 'done':
            LOGGER.debug('unexpected in p: %s %s %s', child.tag, child.text, child.tail)
            continue
        # spacing = child.tag in SPACING_PROTECTED  # todo: outputformat.startswith('xml')?
        # todo: act on spacing here?
        processed_child = handle_textnode(child, options, comments_fix=False, preserve_spaces=True)
        if processed_child is not None:
            # todo: needing attention!
            if processed_child.tag == 'p':
                LOGGER.debug('extra p within p: %s %s %s', processed_child.tag, processed_child.text,
                             processed_child.tail)
                if processed_element.text:
                    processed_element.text += ' ' + processed_child.text
                else:
                    processed_element.text = processed_child.text
                continue
            # handle formatting
            newsub = Element(child.tag)
            if processed_child.tag in P_FORMATTING:
                # check depth and clean
                if len(processed_child) > 0:
                    for item in processed_child:  # children are lists
                        if text_chars_test(item.text) is True:
                            item.text = ' ' + item.text
                        strip_tags(processed_child, item.tag)
                # correct attributes
                if child.tag == 'hi':
                    newsub.set('rend', child.get('rend'))
                elif child.tag == 'ref':
                    if child.get('target') is not None:
                        newsub.set('target', child.get('target'))
            # handle line breaks
            # elif processed_child.tag == 'lb':
            #    try:
            #        processed_child.tail = process_node(child, options).tail
            #    except AttributeError:  # no text
            #        pass
            # prepare text
            # todo: to be moved to handle_textnode()
            # if text_chars_test(processed_child.text) is False:
            #    processed_child.text = ''
            # if text_chars_test(processed_child.tail) is False:
            #    processed_child.tail = ''
            # if there are already children
            # if len(processed_element) > 0:
            #    if text_chars_test(processed_child.tail) is True:
            #        newsub.tail = processed_child.text + processed_child.tail
            #    else:
            #        newsub.tail = processed_child.text
            newsub.text, newsub.tail = processed_child.text, processed_child.tail
            processed_element.append(newsub)
        child.tag = 'done'
    # finish
    if len(processed_element) > 0:
        # clean trailing lb-elements
        if (
                processed_element[-1].tag == 'lb'
                and processed_element[-1].tail is None
        ):
            processed_element[-1].getparent().remove(processed_element[-1])
        return processed_element
    if processed_element.text:
        return processed_element
    LOGGER.debug('discarding p-child: %s', tostring(processed_element))
    return None
  • 首先,清除element的屬性
  • 根據element是否有子元素,分別進行處理。
    • 如果沒有子元素,直接呼叫process_node函數處理element,並返回處理結果
    • 如果有子元素,建立一個新的Element物件,遍歷所有子元素,並根據子元素的標籤進行相應的處理, 遍歷提取文字

咱們視角繼續回到extract_content, 如果result_body有值,那麼

    temp_text = ' '.join(result_body.itertext()).strip()
    # try parsing wild <p> elements if nothing found or text too short
    # todo: test precision and recall settings here
    if len(result_body) == 0 or len(temp_text) < options.config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'):
        result_body = recover_wild_text(backup_tree, result_body, options, potential_tags)
        temp_text = ' '.join(result_body.itertext()).strip()
    # filter output
    strip_elements(result_body, 'done')
    strip_tags(result_body, 'div')
    # return
    return result_body, temp_text, len(temp_text)
  • result_body中的所有文位元組點連線成一個字串,並去除首尾的空白字元,賦值給temp_text
  • 如果result_body為空或者temp_text的長度小於設定中的最小提取大小(MIN_EXTRACTED_SIZE),則嘗試從備份樹(backup_tree)中恢復原始文字,並重新計算temp_text (這個對於上面提到的誤判,有一個修正)
  • 對result_body進行過濾,移除包含'done'文字的元素和所有'div'標籤
  • 最後返回處理後的result_body、temp_text以及temp_text的長度

繼續回到bare_extraction

        # extract content
        postbody, temp_text, len_text = extract_content(cleaned_tree, options)

        # compare if necessary
        if no_fallback is False:
            postbody, temp_text, len_text = compare_extraction(cleaned_tree_backup, tree_backup_1, url, postbody, temp_text, len_text, options)
        # add baseline as additional fallback
        # rescue: try to use original/dirty tree # and favor_precision is False=?
        if len_text < config.getint('DEFAULT', 'MIN_EXTRACTED_SIZE'):
            postbody, temp_text, len_text = baseline(tree_backup_2)
            LOGGER.debug('non-clean extracted length: %s (extraction)', len_text)

        # tree size sanity check
        if max_tree_size is not None:
            # strip tags
            if len(postbody) > max_tree_size:
                LOGGER.debug('output tree too long: %s', len(postbody))
                strip_tags(postbody, 'hi')
            # still too long, raise an error
            if len(postbody) > max_tree_size:
                LOGGER.debug('output tree too long: %s, discarding file', len(postbody))
                raise ValueError
        # size checks
        if len_comments < config.getint('DEFAULT', 'MIN_EXTRACTED_COMM_SIZE'):
            LOGGER.debug('not enough comments %s', url)
        if len_text < config.getint('DEFAULT', 'MIN_OUTPUT_SIZE') and len_comments < config.getint('DEFAULT',
                                                                                                   'MIN_OUTPUT_COMM_SIZE'):
            LOGGER.debug('text and comments not long enough: %s %s', len_text, len_comments)
            raise ValueError

        # check duplicates at body level
        if deduplicate is True and duplicate_test(postbody, config) is True:
            LOGGER.debug('discarding duplicate document for URL %s', url)
            raise ValueError

        # sanity check on language
        if target_language is not None:
            is_not_target_lang, document = language_filter(temp_text, temp_comments, target_language, document)
            if is_not_target_lang is True:
                LOGGER.debug('wrong language for URL %s', url)
                raise ValueError

這裡主要檢查抽取結果:

  • 如果no_fallback為False,則使用compare_extraction函數比較兩個備份樹(cleaned_tree_backup和tree_backup_1),看那個結果更好
  • 如果提取的文字長度小於設定中的最小提取大小(MIN_EXTRACTED_SIZE),則嘗試使用原始tree解析,並更新postbody、temp_text和len_text。
  • 如果輸出樹的長度大於最大樹大小(max_tree_size),則刪除標籤以減小樹的大小。如果仍然太大,則引發錯誤。
  • 重複檢測, 如果deduplicate,會做duplicate_test,有重複內容的話會報錯

總結

Trafilatura 沒有采用類似GNE使用文字區塊密度的方式來確定正文的方案,用來比較多的xpath規則,覆蓋度還是不錯的,但是對於未覆蓋規則的部分,效果差強人意,需要做一些額外的處理。

可以優化的方向:

  • 和GNE的xpath規則,取並集,覆蓋國內主要新聞網站
  • 增加後處理,通過模型去除一些非正文部分