前言
前面一文介紹了通過基礎的web專案結構實現簡單的內容推薦,與其說那個是推薦不如說是一個排序演演算法。因為熱度計算方式雖然解決了內容的時效質量動態化。但是相對使用者而言,大家看到的都是幾乎一致的內容(不一樣也可能只是某時間裡某視訊的排前或靠後),沒有做到個性化的千人千面。
儘管如此,基於內容的熱度推薦依然有他獨特的應用場景——熱門榜單。所以只需要把這個功能換一個模組就可以了,將個性化推薦留給更擅長做這方面的演演算法。
當然了,做推薦系統的方法很多,平臺層面的像spark和今天要講的Surprise。方法層面可以用深度學習做,也可以用協同過濾,或綜合一起等等。大廠可能就更完善了,在召回階段就有很多通道,比如基於折積截幀識別視訊內容,文字相似度計算和現有資料支撐,後面又經過清洗,粗排,精排,重排等等流程,可能他們更多的是要保證平臺內容的多樣性。
那我們這裡依然走入門實際使用為主,能讓我們的專案快速對接上個性化推薦,以下就是在原因PHP專案結構上對接Surprise,實現使用者和物品的相似度推薦。
環境
- python3.8
- Flask2.0
- pandas2.0
- mysql-connector-python
- surprise
- openpyxl
- gunicorn
Surprise介紹
Surprise庫是一款用於構建和分析推薦系統的工具庫,他提供了多種推薦演演算法,包括基線演演算法、鄰域方法、基於矩陣分解的演演算法(如SVD、PMF、SVD++、NMF)等。內建了多種相似性度量方法,如餘弦相似性、均方差(MSD)、皮爾遜相關係數等。這些相似性度量方法可以用於評估使用者之間的相似性,從而為推薦系統提供重要的資料支援。
協同過濾資料集
既然要基於工具庫完成協同過濾推薦,自然就需要按該庫的標準進行。Surprise也和大多數協同過濾框架類似,資料集只需要有使用者對某個物品打分分值,如果自己沒有可以在網上下載免費的Movielens或Jester,以下是我根據業務建立的表格,自行參考。
業務介紹
web業務端通過介面或埋點,在使用者操作的地方根據預設的標準記錄評分記錄。當打分表有資料後,用python將SQL記錄轉為表格再匯入Surprise,根據不同的演演算法訓練,最後根據接收的引數返回對應的推薦top列表。python部分由Flask啟動的服務,與php進行http互動,後面將以片段程式碼說明。
編碼部分
1. PHP請求封裝
2. PHP發起推薦獲取
由於考慮到前期視訊存量不足,是採用協同過濾加熱度榜單結合的方式,前端獲取視訊推薦,介面返回視訊推薦列表的同時也帶了下次請求的標識(分頁碼)。這個分頁碼用於當協同過濾服務掛了或沒有推薦時,放在榜單列表的分頁。但是又要保證分頁數是否實際有效,所以當頁碼太大沒有資料返回就通過遞迴重置為第一頁,也把頁碼返回前端讓資料獲取更流暢。
public static function recommend($flag, $videoIds, $userId)
{
$nexFlag = $flag + 1;
$formatterVideoList = [];
try {
// 協同過濾推薦
$isOpen = config('api.video_recommend.is_open');
$cfVideoIds = [];
if ($isOpen == 1) {
$recommend = new Recommend($flag, $videoIds, $userId);
$recommend->addObserver(new mcf(15));
$recommend->addObserver(new icf(15));
$cfVideoIds = $recommend->startRecommend();
}
// 已讀視訊
$nowTime = strtotime(date('Ymd'));
$timeBefore = $nowTime - 60 * 60 * 24 * 100;
$videoIdsFilter = self::getUserVideoRatingByTime($userId, $timeBefore);
$cfVideoIds = array_diff($cfVideoIds, $videoIdsFilter);
// 違規視訊過濾
$videoPool = [];
$cfVideoIds && $videoPool = ShortVideoModel::listByOrderRaw($cfVideoIds, $flag);
// 冷啟動推薦
!$videoPool && $videoPool = self::hotRank($userId, $videoIdsFilter, $flag);
if ($videoPool) {
list($nexFlag, $videoList) = $videoPool;
$formatterVideoList = self::formatterVideoList($videoList, $userId);
}
} catch (\Exception $e) {
$preFileName = str::snake(__FUNCTION__);
$path = self::getClassName();
write_log("msg:" . $e->getMessage(), $preFileName . "_error", $path);
}
return [$nexFlag, $formatterVideoList];
}
3. 資料集生成
4. 協同過濾服務
5. 基於使用者推薦
# -*- coding: utf-8 -*-
# @File : cf_recommendation.py
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from collections import defaultdict
import os
from surprise import Dataset
from surprise import Reader
from surprise import BaselineOnly
from surprise import KNNBasic
from surprise import KNNBaseline
from heapq import nlargest
import pandas as pd
import datetime
import time
def get_top_n(predictions, n=10):
top_n = defaultdict(list)
for uid, iid, true_r, est, _ in predictions:
top_n[uid].append((iid, est))
for uid, user_ratings in top_n.items():
top_n[uid] = nlargest(n, user_ratings, key=lambda s: s[1])
return top_n
class PredictionSet():
def __init__(self, algo, trainset, user_raw_id=None, k=40):
self.algo = algo
self.trainset = trainset
self.k = k
if user_raw_id is not None:
self.r_uid = user_raw_id
self.i_uid = trainset.to_inner_uid(user_raw_id)
self.knn_userset = self.algo.get_neighbors(self.i_uid, self.k)
user_items = set([j for (j, _) in self.trainset.ur[self.i_uid]])
self.neighbor_items = set()
for nnu in self.knn_userset:
for (j, _) in trainset.ur[nnu]:
if j not in user_items:
self.neighbor_items.add(j)
def user_build_anti_testset(self, fill=None):
fill = self.trainset.global_mean if fill is None else float(fill)
anti_testset = []
user_items = set([j for (j, _) in self.trainset.ur[self.i_uid]])
anti_testset += [(self.r_uid, self.trainset.to_raw_iid(i), fill) for
i in self.neighbor_items if
i not in user_items]
return anti_testset
def user_build_anti_testset(trainset, user_raw_id, fill=None):
fill = trainset.global_mean if fill is None else float(fill)
i_uid = trainset.to_inner_uid(user_raw_id)
anti_testset = []
user_items = set([j for (j, _) in trainset.ur[i_uid]])
anti_testset += [(user_raw_id, trainset.to_raw_iid(i), fill) for
i in trainset.all_items() if
i not in user_items]
return anti_testset
# ================= surprise 推薦部分 ====================
def collaborative_fitlering(raw_uid, top_k):
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
fullDate = str(year) + str(month) + str(day)
dir_data = './collaborative_filtering/cf_excel'
file_path = '{}/dataset_{}.xlsx'.format(dir_data, fullDate)
if not os.path.exists(file_path):
raise IOError("Dataset file is not exists!")
# 讀取資料集#####################
alldata = pd.read_excel(file_path)
reader = Reader(line_format='user item rating')
dataset = Dataset.load_from_df(alldata, reader=reader)
# 所有資料生成訓練集
trainset = dataset.build_full_trainset()
# ================= BaselineOnly ==================
bsl_options = {'method': 'sgd', 'learning_rate': 0.0005}
algo_BaselineOnly = BaselineOnly(bsl_options=bsl_options)
algo_BaselineOnly.fit(trainset)
# 獲得推薦結果
rset = user_build_anti_testset(trainset, raw_uid)
# 測試休眠5秒,讓使用者端超時
# time.sleep(5)
# print(rset)
# exit()
predictions = algo_BaselineOnly.test(rset)
top_n_baselineonly = get_top_n(predictions, n=5)
# ================= KNNBasic ==================
sim_options = {'name': 'pearson', 'user_based': True}
algo_KNNBasic = KNNBasic(sim_options=sim_options)
algo_KNNBasic.fit(trainset)
# 獲得推薦結果 --- 只考慮 knn 使用者的
predictor = PredictionSet(algo_KNNBasic, trainset, raw_uid)
knn_anti_set = predictor.user_build_anti_testset()
predictions = algo_KNNBasic.test(knn_anti_set)
top_n_knnbasic = get_top_n(predictions, n=top_k)
# ================= KNNBaseline ==================
sim_options = {'name': 'pearson_baseline', 'user_based': True}
algo_KNNBaseline = KNNBaseline(sim_options=sim_options)
algo_KNNBaseline.fit(trainset)
# 獲得推薦結果 --- 只考慮 knn 使用者的
predictor = PredictionSet(algo_KNNBaseline, trainset, raw_uid)
knn_anti_set = predictor.user_build_anti_testset()
predictions = algo_KNNBaseline.test(knn_anti_set)
top_n_knnbaseline = get_top_n(predictions, n=top_k)
# =============== 按比例生成推薦結果 ==================
recommendset = set()
for results in [top_n_baselineonly, top_n_knnbasic, top_n_knnbaseline]:
for key in results.keys():
for recommendations in results[key]:
iid, rating = recommendations
recommendset.add(iid)
items_baselineonly = set()
for key in top_n_baselineonly.keys():
for recommendations in top_n_baselineonly[key]:
iid, rating = recommendations
items_baselineonly.add(iid)
items_knnbasic = set()
for key in top_n_knnbasic.keys():
for recommendations in top_n_knnbasic[key]:
iid, rating = recommendations
items_knnbasic.add(iid)
items_knnbaseline = set()
for key in top_n_knnbaseline.keys():
for recommendations in top_n_knnbaseline[key]:
iid, rating = recommendations
items_knnbaseline.add(iid)
rank = dict()
for recommendation in recommendset:
if recommendation not in rank:
rank[recommendation] = 0
if recommendation in items_baselineonly:
rank[recommendation] += 1
if recommendation in items_knnbasic:
rank[recommendation] += 1
if recommendation in items_knnbaseline:
rank[recommendation] += 1
max_rank = max(rank, key=lambda s: rank[s])
if max_rank == 1:
return list(items_baselineonly)
else:
result = nlargest(top_k, rank, key=lambda s: rank[s])
return list(result)
# print("排名結果: {}".format(result))
6. 基於物品推薦
其他
1. 推薦服務生產部署
開發環境下可以通過python recommend_service.py啟動,後面部署環境需要用到gunicorn,方式是安裝後設定環境變數。程式碼裡匯入werkzeug.middleware.proxy_fix, 修改以下的啟動部分以下內容,啟動改為gunicorn -w 5 -b 0.0.0.0:6016 app:app
app.wsgi_app = ProxyFix(app.wsgi_app)
app.run()
2. 模型本地儲存
隨著業務資料的累計,自然需要訓練的資料集也越來越大,所以後期關於模型訓練週期,可以縮短。也就是定時訓練模型後儲存到本地,然後根據線上的資料做出推薦,模型儲存與讀取方法如下。
2.1. 模型儲存
2.2. 模型讀取
寫在最後
上面的依然只是實現了推薦系統的一小部分,在做資料召回不管可以對視訊截幀還可以分離音訊,通過折積神經網路識別音訊種類和視訊大致內容。再根據使用者以往瀏覽記錄形成的標籤實現內容匹配等等,這個還要後期不斷學習和完善的。