構建一個即時訊息應用(三):對話

2020-03-30 19:40:00

本文是該系列的第三篇。

在我們的即時訊息應用中,訊息表現為兩個參與者對話的堆疊。如果你想要開始一場對話,就應該向應用提供你想要交談的使用者,而當對話建立後(如果該對話此前並不存在),就可以向該對話傳送訊息。

就前端而言,我們可能想要顯示一份近期對話列表。並在此處顯示對話的最後一條訊息以及另一個參與者的姓名和頭像。

在這篇貼文中,我們將會編寫一些端點endpoint來完成像“建立對話”、“獲取對話列表”以及“找到單個對話”這樣的任務。

首先,要在主函數 main() 中新增下面的路由。

router.HandleFunc("POST", "/api/conversations", requireJSON(guard(createConversation)))router.HandleFunc("GET", "/api/conversations", guard(getConversations))router.HandleFunc("GET", "/api/conversations/:conversationID", guard(getConversation))

這三個端點都需要進行身份驗證,所以我們將會使用 guard() 中介軟體。我們也會構建一個新的中介軟體,用於檢查請求內容是否為 JSON 格式。

JSON 請求檢查中介軟體

func requireJSON(handler http.HandlerFunc) http.HandlerFunc {    return func(w http.ResponseWriter, r *http.Request) {        if ct := r.Header.Get("Content-Type"); !strings.HasPrefix(ct, "application/json") {            http.Error(w, "Content type of application/json required", http.StatusUnsupportedMediaType)            return        }        handler(w, r)    }}

如果請求request不是 JSON 格式,那麼它會返回 415 Unsupported Media Type(不支援的媒體型別)錯誤。

建立對話

type Conversation struct {    ID                string   `json:"id"`    OtherParticipant  *User    `json:"otherParticipant"`    LastMessage       *Message `json:"lastMessage"`    HasUnreadMessages bool     `json:"hasUnreadMessages"`}

就像上面的程式碼那樣,對話中保持對另一個參與者和最後一條訊息的參照,還有一個 bool 型別的欄位,用來告知是否有未讀訊息。

type Message struct {    ID             string    `json:"id"`    Content        string    `json:"content"`    UserID         string    `json:"-"`    ConversationID string    `json:"conversationID,omitempty"`    CreatedAt      time.Time `json:"createdAt"`    Mine           bool      `json:"mine"`    ReceiverID     string    `json:"-"`}

我們會在下一篇文章介紹與訊息相關的內容,但由於我們這裡也需要用到它,所以先定義了 Message 結構體。其中大多數位段與資料庫表一致。我們需要使用 Mine 來斷定訊息是否屬於當前已驗證使用者所有。一旦加入實時功能,ReceiverID 可以幫助我們過濾訊息。

接下來讓我們編寫 HTTP 處理程式。儘管它有些長,但也沒什麼好怕的。

func createConversation(w http.ResponseWriter, r *http.Request) {    var input struct {        Username string `json:"username"`    }    defer r.Body.Close()    if err := json.NewDecoder(r.Body).Decode(&input); err != nil {        http.Error(w, err.Error(), http.StatusBadRequest)        return    }    input.Username = strings.TrimSpace(input.Username)    if input.Username == "" {        respond(w, Errors{map[string]string{            "username": "Username required",        }}, http.StatusUnprocessableEntity)        return    }    ctx := r.Context()    authUserID := ctx.Value(keyAuthUserID).(string)    tx, err := db.BeginTx(ctx, nil)    if err != nil {        respondError(w, fmt.Errorf("could not begin tx: %v", err))        return    }    defer tx.Rollback()    var otherParticipant User    if err := tx.QueryRowContext(ctx, `        SELECT id, avatar_url FROM users WHERE username = $1    `, input.Username).Scan(        &otherParticipant.ID,        &otherParticipant.AvatarURL,    ); err == sql.ErrNoRows {        http.Error(w, "User not found", http.StatusNotFound)        return    } else if err != nil {        respondError(w, fmt.Errorf("could not query other participant: %v", err))        return    }    otherParticipant.Username = input.Username    if otherParticipant.ID == authUserID {        http.Error(w, "Try start a conversation with someone else", http.StatusForbidden)        return    }    var conversationID string    if err := tx.QueryRowContext(ctx, `        SELECT conversation_id FROM participants WHERE user_id = $1        INTERSECT        SELECT conversation_id FROM participants WHERE user_id = $2    `, authUserID, otherParticipant.ID).Scan(&conversationID); err != nil && err != sql.ErrNoRows {        respondError(w, fmt.Errorf("could not query common conversation id: %v", err))        return    } else if err == nil {        http.Redirect(w, r, "/api/conversations/"+conversationID, http.StatusFound)        return    }    var conversation Conversation    if err = tx.QueryRowContext(ctx, `        INSERT INTO conversations DEFAULT VALUES        RETURNING id    `).Scan(&conversation.ID); err != nil {        respondError(w, fmt.Errorf("could not insert conversation: %v", err))        return    }    if _, err = tx.ExecContext(ctx, `        INSERT INTO participants (user_id, conversation_id) VALUES            ($1, $2),            ($3, $2)    `, authUserID, conversation.ID, otherParticipant.ID); err != nil {        respondError(w, fmt.Errorf("could not insert participants: %v", err))        return    }    if err = tx.Commit(); err != nil {        respondError(w, fmt.Errorf("could not commit tx to create conversation: %v", err))        return    }    conversation.OtherParticipant = &otherParticipant    respond(w, conversation, http.StatusCreated)}

在此端點,你會向 /api/conversations 傳送 POST 請求,請求的 JSON 主體中包含要對話的使用者的使用者名稱。

因此,首先需要將請求主體解析成包含使用者名稱的結構。然後,校驗使用者名稱不能為空。

type Errors struct {    Errors map[string]string `json:"errors"`}

這是錯誤訊息的結構體 Errors,它僅僅是一個對映。如果輸入空使用者名稱,你就會得到一段帶有 422 Unprocessable Entity(無法處理的實體)錯誤訊息的 JSON 。

{    "errors": {        "username": "Username required"    }}

然後,我們開始執行 SQL 事務。收到的僅僅是使用者名稱,但事實上,我們需要知道實際的使用者 ID 。因此,事務的第一項內容是查詢另一個參與者的 ID 和頭像。如果找不到該使用者,我們將會返回 404 Not Found(未找到) 錯誤。另外,如果找到的使用者恰好和“當前已驗證使用者”相同,我們應該返回 403 Forbidden(拒絕處理)錯誤。這是由於對話只應當在兩個不同的使用者之間發起,而不能是同一個。

然後,我們試圖找到這兩個使用者所共有的對話,所以需要使用 INTERSECT 語句。如果存在,只需要通過 /api/conversations/{conversationID} 重定向到該對話並將其返回。

如果未找到共有的對話,我們需要建立一個新的對話並新增指定的兩個參與者。最後,我們 COMMIT 該事務並使用新建立的對話進行響應。

獲取對話列表

端點 /api/conversations 將獲取當前已驗證使用者的所有對話。

func getConversations(w http.ResponseWriter, r *http.Request) {    ctx := r.Context()    authUserID := ctx.Value(keyAuthUserID).(string)    rows, err := db.QueryContext(ctx, `        SELECT            conversations.id,            auth_user.messages_read_at < messages.created_at AS has_unread_messages,            messages.id,            messages.content,            messages.created_at,            messages.user_id = $1 AS mine,            other_users.id,            other_users.username,            other_users.avatar_url        FROM conversations        INNER JOIN messages ON conversations.last_message_id = messages.id        INNER JOIN participants other_participants            ON other_participants.conversation_id = conversations.id                AND other_participants.user_id != $1        INNER JOIN users other_users ON other_participants.user_id = other_users.id        INNER JOIN participants auth_user            ON auth_user.conversation_id = conversations.id                AND auth_user.user_id = $1        ORDER BY messages.created_at DESC    `, authUserID)    if err != nil {        respondError(w, fmt.Errorf("could not query conversations: %v", err))        return    }    defer rows.Close()    conversations := make([]Conversation, 0)    for rows.Next() {        var conversation Conversation        var lastMessage Message        var otherParticipant User        if err = rows.Scan(            &conversation.ID,            &conversation.HasUnreadMessages,            &lastMessage.ID,            &lastMessage.Content,            &lastMessage.CreatedAt,            &lastMessage.Mine,            &otherParticipant.ID,            &otherParticipant.Username,            &otherParticipant.AvatarURL,        ); err != nil {            respondError(w, fmt.Errorf("could not scan conversation: %v", err))            return        }        conversation.LastMessage = &lastMessage        conversation.OtherParticipant = &otherParticipant        conversations = append(conversations, conversation)    }    if err = rows.Err(); err != nil {        respondError(w, fmt.Errorf("could not iterate over conversations: %v", err))        return    }    respond(w, conversations, http.StatusOK)}

該處理程式僅對資料庫進行查詢。它通過一些聯接來查詢對話表……首先,從訊息表中獲取最後一條訊息。然後依據“ID 與當前已驗證使用者不同”的條件,從參與者表找到對話的另一個參與者。然後聯接到使用者表以獲取該使用者的使用者名稱和頭像。最後,再次聯接參與者表,並以相反的條件從該表中找出參與對話的另一個使用者,其實就是當前已驗證使用者。我們會對比訊息中的 messages_read_atcreated_at 兩個欄位,以確定對話中是否存在未讀訊息。然後,我們通過 user_id 欄位來判定該訊息是否屬於“我”(指當前已驗證使用者)。

注意,此查詢過程假定對話中只有兩個使用者參與,它也僅僅適用於這種情況。另外,該設計也不很適用於需要顯示未讀訊息數量的情況。如果需要顯示未讀訊息的數量,我認為可以在 participants 表上新增一個unread_messages_count INT 欄位,並在每次建立新訊息的時候遞增它,如果使用者已讀則重置該欄位。

接下來需要遍歷每一條記錄,通過掃描每一個存在的對話來建立一個對話切片slice of conversations並在最後進行響應。

找到單個對話

端點 /api/conversations/{conversationID} 會根據 ID 對單個對話進行響應。

func getConversation(w http.ResponseWriter, r *http.Request) {    ctx := r.Context()    authUserID := ctx.Value(keyAuthUserID).(string)    conversationID := way.Param(ctx, "conversationID")    var conversation Conversation    var otherParticipant User    if err := db.QueryRowContext(ctx, `        SELECT            IFNULL(auth_user.messages_read_at < messages.created_at, false) AS has_unread_messages,            other_users.id,            other_users.username,            other_users.avatar_url        FROM conversations        LEFT JOIN messages ON conversations.last_message_id = messages.id        INNER JOIN participants other_participants            ON other_participants.conversation_id = conversations.id                AND other_participants.user_id != $1        INNER JOIN users other_users ON other_participants.user_id = other_users.id        INNER JOIN participants auth_user            ON auth_user.conversation_id = conversations.id                AND auth_user.user_id = $1        WHERE conversations.id = $2    `, authUserID, conversationID).Scan(        &conversation.HasUnreadMessages,        &otherParticipant.ID,        &otherParticipant.Username,        &otherParticipant.AvatarURL,    ); err == sql.ErrNoRows {        http.Error(w, "Conversation not found", http.StatusNotFound)        return    } else if err != nil {        respondError(w, fmt.Errorf("could not query conversation: %v", err))        return    }    conversation.ID = conversationID    conversation.OtherParticipant = &otherParticipant    respond(w, conversation, http.StatusOK)}

這裡的查詢與之前有點類似。儘管我們並不關心最後一條訊息的顯示問題,並因此忽略了與之相關的一些欄位,但是我們需要根據這條訊息來判斷對話中是否存在未讀訊息。此時,我們使用 LEFT JOIN 來代替 INNER JOIN,因為 last_message_id 欄位是 NULLABLE(可以為空)的;而其他情況下,我們無法得到任何記錄。基於同樣的理由,我們在 has_unread_messages 的比較中使用了 IFNULL 語句。最後,我們按 ID 進行過濾。

如果查詢沒有返回任何記錄,我們的響應會返回 404 Not Found 錯誤,否則響應將會返回 200 OK 以及找到的對話。


本篇貼文以建立了一些對話端點結束。

在下一篇貼文中,我們將會看到如何建立並列出訊息。