基於Surprise協同過濾實現短視訊推薦

2023-07-04 18:01:25

 前言

        前面一文介紹了通過基礎的web專案結構實現簡單的內容推薦,與其說那個是推薦不如說是一個排序演演算法。因為熱度計算方式雖然解決了內容的時效質量動態化。但是相對使用者而言,大家看到的都是幾乎一致的內容(不一樣也可能只是某時間裡某視訊的排前或靠後),沒有做到個性化的千人千面。

        儘管如此,基於內容的熱度推薦依然有他獨特的應用場景——熱門榜單。所以只需要把這個功能換一個模組就可以了,將個性化推薦留給更擅長做這方面的演演算法。

        當然了,做推薦系統的方法很多,平臺層面的像spark和今天要講的Surprise。方法層面可以用深度學習做,也可以用協同過濾,或綜合一起等等。大廠可能就更完善了,在召回階段就有很多通道,比如基於折積截幀識別視訊內容,文字相似度計算和現有資料支撐,後面又經過清洗,粗排,精排,重排等等流程,可能他們更多的是要保證平臺內容的多樣性。

        那我們這裡依然走入門實際使用為主,能讓我們的專案快速對接上個性化推薦,以下就是在原因PHP專案結構上對接Surprise,實現使用者和物品的相似度推薦。

 

環境

  • python3.8
  • Flask2.0
  • pandas2.0
  • mysql-connector-python     
  • surprise
  • openpyxl
  • gunicorn

 

Surprise介紹

        Surprise庫是一款用於構建和分析推薦系統的工具庫,他提供了多種推薦演演算法,包括基線演演算法、鄰域方法、基於矩陣分解的演演算法(如SVD、PMF、SVD++、NMF)等。內建了多種相似性度量方法,如餘弦相似性、均方差(MSD)、皮爾遜相關係數等。這些相似性度量方法可以用於評估使用者之間的相似性,從而為推薦系統提供重要的資料支援。

 

協同過濾資料集

        既然要基於工具庫完成協同過濾推薦,自然就需要按該庫的標準進行。Surprise也和大多數協同過濾框架類似,資料集只需要有使用者對某個物品打分分值,如果自己沒有可以在網上下載免費的Movielens或Jester,以下是我根據業務建立的表格,自行參考。

CREATE TABLE `short_video_rating` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(120) DEFAULT '',
  `item_id` int(11) DEFAULT '0',
  `rating` int(11) unsigned DEFAULT '0' COMMENT '評分',
  `scoring_set` json DEFAULT NULL COMMENT '行為集合',
  `create_time` int(11) DEFAULT '0',
  `action_day_time` int(11) DEFAULT '0' COMMENT '更新當天時間',
  `update_time` int(11) DEFAULT '0' COMMENT '更新時間',
  `delete_time` int(11) DEFAULT '0' COMMENT '刪除時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=107 DEFAULT CHARSET=utf8mb4 COMMENT='使用者對視訊評分表';

 

業務介紹

        web業務端通過介面或埋點,在使用者操作的地方根據預設的標準記錄評分記錄。當打分表有資料後,用python將SQL記錄轉為表格再匯入Surprise,根據不同的演演算法訓練,最後根據接收的引數返回對應的推薦top列表。python部分由Flask啟動的服務,與php進行http互動,後面將以片段程式碼說明。

 

編碼部分

1. PHP請求封裝

<?php
/**
 * Created by ZERO開發.
 * User: 北橋蘇
 * Date: 2023/6/26 0026
 * Time: 14:43
 */

namespace app\common\service;


class Recommend
{
    private $condition;

    private $cfRecommends = [];

    private $output = [];

    public function __construct($flag = 1, $lastRecommendIds = [], $userId = "")
    {
        $this->condition['flag'] = $flag;
        $this->condition['last_recommend_ids'] = $lastRecommendIds;
        $this->condition['user_id'] = $userId;
    }

    public function addObserver($cfRecommend)
    {
        $this->cfRecommends[] = $cfRecommend;
    }

    public function startRecommend()
    {
        foreach ($this->cfRecommends as $cfRecommend) {
            $res = $cfRecommend->recommend($this->condition);
            $this->output = array_merge($res, $this->output);
        }

        $this->output = array_values(array_unique($this->output));

        return $this->output;
    }
}


abstract class cfRecommendBase
{

    protected $cfGatewayUrl = "127.0.0.1:6016";
    protected $limit = 15;

    public function __construct($limit = 15)
    {
        $this->limit = $limit;
        $this->cfGatewayUrl = config('api.video_recommend.gateway_url');
    }

    abstract public function recommend($condition);
}


class mcf extends cfRecommendBase
{
    public function recommend($condition)
    {
        //echo "mcf\n";
        $videoIdArr = [];

        $flag = $condition['flag'] ?? 1;
        $userId = $condition['user_id'] ?? '';
        $url = "{$this->cfGatewayUrl}/mcf_recommend";

        if ($flag == 1 && $userId) {
            //echo "mcf2\n";
            $param['raw_uid'] = (string)$userId;
            $param['top_k'] = $this->limit;

            $list = httpRequest($url, $param, 'json');
            $videoIdArr = json_decode($list, true) ?? [];
        }

        return $videoIdArr;
    }
}


class icf extends cfRecommendBase
{
    public function recommend($condition)
    {
        //echo "icf\n";
        $videoIdArr = [];

        $flag = $condition['flag'] ?? 1;
        $userId = $condition['user_id'] ?? '';
        $lastRecommendIds = $condition['last_recommend_ids'] ?? [];
        $url = "{$this->cfGatewayUrl}/icf_recommend";

        if ($flag > 1 && $lastRecommendIds && $userId) {
            //echo "icf2\n";
            $itemId = $lastRecommendIds[0] ?? 0;
            $param['raw_item_id'] = $itemId;
            $param['top_k'] = $this->limit;

            $list = httpRequest($url, $param, 'json');
            $videoIdArr = json_decode($list, true) ?? [];
        }

        return $videoIdArr;
    }
}

2. PHP發起推薦獲取

由於考慮到前期視訊存量不足,是採用協同過濾加熱度榜單結合的方式,前端獲取視訊推薦,介面返回視訊推薦列表的同時也帶了下次請求的標識(分頁碼)。這個分頁碼用於當協同過濾服務掛了或沒有推薦時,放在榜單列表的分頁。但是又要保證分頁數是否實際有效,所以當頁碼太大沒有資料返回就通過遞迴重置為第一頁,也把頁碼返回前端讓資料獲取更流暢。

 

public static function recommend($flag, $videoIds, $userId)

    {
        $nexFlag = $flag + 1;
        $formatterVideoList = [];

        try {
            // 協同過濾推薦
            $isOpen = config('api.video_recommend.is_open');
            $cfVideoIds = [];
            if ($isOpen == 1) {
                $recommend = new Recommend($flag, $videoIds, $userId);
                $recommend->addObserver(new mcf(15));
                $recommend->addObserver(new icf(15));
                $cfVideoIds = $recommend->startRecommend();
            }

            // 已讀視訊
            $nowTime = strtotime(date('Ymd'));
            $timeBefore = $nowTime - 60 * 60 * 24 * 100;
            $videoIdsFilter = self::getUserVideoRatingByTime($userId, $timeBefore);
            $cfVideoIds = array_diff($cfVideoIds, $videoIdsFilter);

            // 違規視訊過濾
            $videoPool = [];
            $cfVideoIds && $videoPool = ShortVideoModel::listByOrderRaw($cfVideoIds, $flag);

            // 冷啟動推薦
            !$videoPool && $videoPool = self::hotRank($userId, $videoIdsFilter, $flag);

            if ($videoPool) {
                list($nexFlag, $videoList) = $videoPool;
                $formatterVideoList = self::formatterVideoList($videoList, $userId);
            }
        } catch (\Exception $e) {
            $preFileName = str::snake(__FUNCTION__);
            $path = self::getClassName();
            write_log("msg:" . $e->getMessage(), $preFileName . "_error", $path);
        }

        return [$nexFlag, $formatterVideoList];
    }

3. 資料集生成

import os
import mysql.connector
import datetime
import pandas as pd

now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
fullDate = str(year) + str(month) + str(day)

dir_data = './collaborative_filtering/cf_excel'
file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate)
db_config = {
    "host": "127.0.0.1",
    "database": "database",
    "user": "user",
    "password": "password"
}

if not os.path.exists(file_path):
    cnx = mysql.connector.connect(user=db_config['user'], password=db_config['password'],
                                  host=db_config['host'], database=db_config['database'])

    df = pd.read_sql_query("SELECT user_id, item_id, rating FROM short_video_rating", cnx)

    print('---------------插入資料集----------------')

    # 將資料框寫入Excel檔案
    df.to_excel(file_path, index=False)

if not os.path.exists(file_path):
    raise IOError("Dataset file is not exists!")


4. 協同過濾服務

import os

from flask import Flask, request, json, Response, abort
from collaborative_filtering import cf_item
from collaborative_filtering import cf_user
from collaborative_filtering import cf_mix
from werkzeug.middleware.proxy_fix import ProxyFix

app = Flask(__name__)

@app.route('/')
def hello_world():
    return abort(404)

@app.route('/mcf_recommend', methods=["POST", "GET"])
def get_mcf_recommendation():
    json_data = request.get_json()

    raw_uid = json_data.get("raw_uid")
    top_k = json_data.get("top_k")

    recommend_result = cf_mix.collaborative_fitlering(raw_uid, top_k)

    return Response(json.dumps(recommend_result), mimetype='application/json')

@app.route('/ucf_recommend', methods=["POST", "GET"])
def get_ucf_recommendation():
    json_data = request.get_json()

    raw_uid = json_data.get("raw_uid")
    top_k = json_data.get("top_k")

    recommend_result = cf_user.collaborative_fitlering(raw_uid, top_k)

    return Response(json.dumps(recommend_result), mimetype='application/json')

@app.route('/icf_recommend', methods=["POST", "GET"])
def get_icf_recommendation():
    json_data = request.get_json()

    raw_item_id = json_data.get("raw_item_id")
    top_k = json_data.get("top_k")

    recommend_result = cf_item.collaborative_fitlering(raw_item_id, top_k)

    return Response(json.dumps(recommend_result), mimetype='application/json')

if __name__ == '__main__':
    app.run(host="0.0.0.0",
            debug=True,
            port=6016
            )

5. 基於使用者推薦

# -*- coding: utf-8 -*-
# @File    : cf_recommendation.py
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from collections import defaultdict

import os
from surprise import Dataset
from surprise import Reader
from surprise import BaselineOnly
from surprise import KNNBasic
from surprise import KNNBaseline
from heapq import nlargest
import pandas as pd
import datetime
import time

def get_top_n(predictions, n=10):
    top_n = defaultdict(list)
    for uid, iid, true_r, est, _ in predictions:
        top_n[uid].append((iid, est))

    for uid, user_ratings in top_n.items():
        top_n[uid] = nlargest(n, user_ratings, key=lambda s: s[1])

    return top_n

class PredictionSet():

    def __init__(self, algo, trainset, user_raw_id=None, k=40):
        self.algo = algo
        self.trainset = trainset
        self.k = k
        if user_raw_id is not None:
            self.r_uid = user_raw_id
            self.i_uid = trainset.to_inner_uid(user_raw_id)
            self.knn_userset = self.algo.get_neighbors(self.i_uid, self.k)
            user_items = set([j for (j, _) in self.trainset.ur[self.i_uid]])
            self.neighbor_items = set()
            for nnu in self.knn_userset:
                for (j, _) in trainset.ur[nnu]:
                    if j not in user_items:
                        self.neighbor_items.add(j)

    def user_build_anti_testset(self, fill=None):
        fill = self.trainset.global_mean if fill is None else float(fill)

        anti_testset = []
        user_items = set([j for (j, _) in self.trainset.ur[self.i_uid]])
        anti_testset += [(self.r_uid, self.trainset.to_raw_iid(i), fill) for
                         i in self.neighbor_items if
                         i not in user_items]
        return anti_testset

def user_build_anti_testset(trainset, user_raw_id, fill=None):
    fill = trainset.global_mean if fill is None else float(fill)

    i_uid = trainset.to_inner_uid(user_raw_id)

    anti_testset = []

    user_items = set([j for (j, _) in trainset.ur[i_uid]])

    anti_testset += [(user_raw_id, trainset.to_raw_iid(i), fill) for
                     i in trainset.all_items() if
                     i not in user_items]

    return anti_testset


# ================= surprise 推薦部分 ====================
def collaborative_fitlering(raw_uid, top_k):

    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    fullDate = str(year) + str(month) + str(day)

    dir_data = './collaborative_filtering/cf_excel'
    file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate)

    if not os.path.exists(file_path):
        raise IOError("Dataset file is not exists!")

    # 讀取資料集#####################
    alldata = pd.read_excel(file_path)

    reader = Reader(line_format='user item rating')
    dataset = Dataset.load_from_df(alldata, reader=reader)

    # 所有資料生成訓練集
    trainset = dataset.build_full_trainset()

    # ================= BaselineOnly  ==================
    bsl_options = {'method': 'sgd', 'learning_rate': 0.0005}
    algo_BaselineOnly = BaselineOnly(bsl_options=bsl_options)
    algo_BaselineOnly.fit(trainset)

    # 獲得推薦結果
    rset = user_build_anti_testset(trainset, raw_uid)

    # 測試休眠5秒,讓使用者端超時
    # time.sleep(5)
    # print(rset)
    # exit()

    predictions = algo_BaselineOnly.test(rset)
    top_n_baselineonly = get_top_n(predictions, n=5)

    # ================= KNNBasic  ==================
    sim_options = {'name': 'pearson', 'user_based': True}
    algo_KNNBasic = KNNBasic(sim_options=sim_options)
    algo_KNNBasic.fit(trainset)

    # 獲得推薦結果  ---  只考慮 knn 使用者的
    predictor = PredictionSet(algo_KNNBasic, trainset, raw_uid)
    knn_anti_set = predictor.user_build_anti_testset()
    predictions = algo_KNNBasic.test(knn_anti_set)
    top_n_knnbasic = get_top_n(predictions, n=top_k)

    # ================= KNNBaseline  ==================
    sim_options = {'name': 'pearson_baseline', 'user_based': True}
    algo_KNNBaseline = KNNBaseline(sim_options=sim_options)
    algo_KNNBaseline.fit(trainset)

    # 獲得推薦結果  ---  只考慮 knn 使用者的
    predictor = PredictionSet(algo_KNNBaseline, trainset, raw_uid)
    knn_anti_set = predictor.user_build_anti_testset()
    predictions = algo_KNNBaseline.test(knn_anti_set)
    top_n_knnbaseline = get_top_n(predictions, n=top_k)

    # =============== 按比例生成推薦結果 ==================
    recommendset = set()
    for results in [top_n_baselineonly, top_n_knnbasic, top_n_knnbaseline]:
        for key in results.keys():
            for recommendations in results[key]:
                iid, rating = recommendations
                recommendset.add(iid)

    items_baselineonly = set()
    for key in top_n_baselineonly.keys():
        for recommendations in top_n_baselineonly[key]:
            iid, rating = recommendations
            items_baselineonly.add(iid)

    items_knnbasic = set()
    for key in top_n_knnbasic.keys():
        for recommendations in top_n_knnbasic[key]:
            iid, rating = recommendations
            items_knnbasic.add(iid)

    items_knnbaseline = set()
    for key in top_n_knnbaseline.keys():
        for recommendations in top_n_knnbaseline[key]:
            iid, rating = recommendations
            items_knnbaseline.add(iid)

    rank = dict()
    for recommendation in recommendset:
        if recommendation not in rank:
            rank[recommendation] = 0
        if recommendation in items_baselineonly:
            rank[recommendation] += 1
        if recommendation in items_knnbasic:
            rank[recommendation] += 1
        if recommendation in items_knnbaseline:
            rank[recommendation] += 1

    max_rank = max(rank, key=lambda s: rank[s])
    if max_rank == 1:
        return list(items_baselineonly)
    else:
        result = nlargest(top_k, rank, key=lambda s: rank[s])

        return list(result)

        # print("排名結果: {}".format(result))

6. 基於物品推薦

# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from collections import defaultdict

import io
import os
from surprise import SVD, KNNBaseline, Reader, Dataset
import pandas as pd
import datetime
import mysql.connector
import pickle

# ================= surprise 推薦部分 ====================
def collaborative_fitlering(raw_item_id, top_k):

    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    fullDate = str(year) + str(month) + str(day)

    # dir_data = './collaborative_filtering/cf_excel'
    dir_data = './cf_excel'
    file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate)

    if not os.path.exists(file_path):
        raise IOError("Dataset file is not exists!")

    # 讀取資料集#####################
    alldata = pd.read_excel(file_path)

    reader = Reader(line_format='user item rating')
    dataset = Dataset.load_from_df(alldata, reader=reader)

    # 使用協同過濾必須有這行,將我們的演演算法運用於整個資料集,而不進行交叉驗證,構建了新的矩陣
    trainset = dataset.build_full_trainset()

    # print(pd.DataFrame(list(trainset.global_mean())))
    # exit()

    # 度量準則:pearson距離,協同過濾:基於item
    sim_options = {'name': 'pearson_baseline', 'user_based': False}
    algo = KNNBaseline(sim_options=sim_options)
    algo.fit(trainset)

    # 將訓練好的模型序列化到磁碟上
    # with open('./cf_models/cf_item_model.pkl', 'wb') as f:
    #     pickle.dump(algo, f)

    #從磁碟中讀取訓練好的模型
    # with open('cf_item_model.pkl', 'rb') as f:
    #     algo = pickle.load(f)

    # 轉換為內部id
    toy_story_inner_id = algo.trainset.to_inner_iid(raw_item_id)
    # 根據內部id找到最近的10個鄰居
    toy_story_neighbors = algo.get_neighbors(toy_story_inner_id, k=top_k)
    # 將10個鄰居的內部id轉換為item id也就是raw
    toy_story_neighbors_rids = (algo.trainset.to_raw_iid(inner_id) for inner_id in toy_story_neighbors)

    result = list(toy_story_neighbors_rids)

    return result

    # print(list(toy_story_neighbors_rids))


if __name__ == "__main__":
    res = collaborative_fitlering(15, 20)
    print(res)

 

其他

1. 推薦服務生產部署

開發環境下可以通過python recommend_service.py啟動,後面部署環境需要用到gunicorn,方式是安裝後設定環境變數。程式碼裡匯入werkzeug.middleware.proxy_fix, 修改以下的啟動部分以下內容,啟動改為gunicorn -w 5 -b 0.0.0.0:6016 app:app

app.wsgi_app = ProxyFix(app.wsgi_app)
app.run()

2. 模型本地儲存

隨著業務資料的累計,自然需要訓練的資料集也越來越大,所以後期關於模型訓練週期,可以縮短。也就是定時訓練模型後儲存到本地,然後根據線上的資料做出推薦,模型儲存與讀取方法如下。

2.1. 模型儲存

sim_options = {'name': 'pearson_baseline', 'user_based': False}
    algo = KNNBaseline(sim_options=sim_options)
    algo.fit(trainset)

    # 將訓練好的模型序列化到磁碟上
    with open('./cf_models/cf_item_model.pkl', 'wb') as f:
        pickle.dump(algo, f)

2.2. 模型讀取

    with open('cf_item_model.pkl', 'rb') as f:
        algo = pickle.load(f)

    # 轉換為內部id
    toy_story_inner_id = algo.trainset.to_inner_iid(raw_item_id)
    # 根據內部id找到最近的10個鄰居
    toy_story_neighbors = algo.get_neighbors(toy_story_inner_id, k=top_k)
    # 將10個鄰居的內部id轉換為item id也就是raw
    toy_story_neighbors_rids = (algo.trainset.to_raw_iid(inner_id) for inner_id in toy_story_neighbors)

    result = list(toy_story_neighbors_rids)

    return result

寫在最後

       上面的依然只是實現了推薦系統的一小部分,在做資料召回不管可以對視訊截幀還可以分離音訊,通過折積神經網路識別音訊種類和視訊大致內容。再根據使用者以往瀏覽記錄形成的標籤實現內容匹配等等,這個還要後期不斷學習和完善的。