Quần Cam

[Elixir RSS Reader] Phần 1 - HTTP client

Mở đầu

Vừa rồi, như các bạn chưa biết, mình đã chuyển toàn bộ bài viết kĩ thuật từ hqc.io sang Blog Quần Cam.

Tuy nhiên phát sinh từ nhu cầu thực tế, mình muốn các bài viết mới bên này vẫn hiển thị trên hqc.io. Cách dễ nhất là đọc RSS feed được cung cấp và hiển thị lại.

Không liên quan lắm, cơ mà loạt bài viết này mình sẽ chia sẻ lại từng bước cách mình viết một RSS reader với Elixir. Qua đó giúp các bạn mới và có ý định học Elixir có thể hiểu thêm về ngôn ngữ này.

Loạt bài viết này sẽ bao gồm 3 phần:

  1. HTTP Client - Viết một HTTP client để request RSS feed.
  2. Lưu các feed đã lấy vào Erlang Term Storage (ETS).
  3. Hiển thị các feed đã lấy lên web với Cowboy và Plug.

Để đọc bài viết này, bạn cần phải thoải mái với syntax và pattern matching của Elixir. Tuy vậy mình sẽ cố lồng ghép giải thích trong từng bài viết.

Nếu bạn mới bắt đầu với ngôn ngữ Elixir, mình sẽ giới thiệu bạn đọc loạt bài Elixir cho dân Ruby của anh Trung Lê, hoặc ElixirSchool (một dự án dịch sách đã hoàn thành của nhóm Elixir Vietnam).

Bây giờ ta sẽ bắt đầu vào việc nào.

Tạo project

Để tạo một project Elixir rất đơn giản, đảm bảo bạn đã cài Elixir. Sau đó chạy:

mix new rxx

Ta đã tạo xong project Rxx của chúng ta trong thư mục rxx.

GenServer

Một số câu hỏi rất thường được newbie hỏi khi mới bắt đầu với Elixir.

Q: Process trong Elixir/Erlang là gì?

A: Có thể bạn sẽ lẫn lộn nhưng process của Erlang không phải là UNIX process, nó chỉ vô tình trùng tên thôi. Bạn có thể hiểu chúng là các thread chạy riêng lẻ, và nhẹ trong máy ảo Erlang. Điều này giúp ta có thể tạo và dừng chúng dễ dàng.

Q: Khi nào tôi nên dùng process?

A: Câu trả lời thật ra khá đơn giản. Nếu bạn muốn giữ một state in time trong Erlang/Elixir, bạn chỉ có thể dùng process. Bạn muốn giữ connection pool? Bạn muốn giữ etag cuối cùng của request? Bạn bỏ chúng trong process.

Q: Khi nào thì tôi nên dùng GenServer?

A: GenServer giúp khái quát hóa cách ta tạo và giữ data trong process, đồng thời cung cấp thêm một số callbacks để giúp xử lý các thao tác async/sync.

Nếu bạn là người mới và bạn đọc hiểu hết cái Q&A. Chúc mừng! Bạn thuộc top 1% của nhân loại.

Còn không, chào mừng bạn đến với 99% còn lại, hãy đọc tiếp bài viết.

Lấy RSS feed

Về mặt thư viện ta sẽ sử dụng hackney để gửi http request và feed_ex để parse RSS feed.

Thêm hai thư viên này vào Mix file và chạy mix deps.get.

# mix.exs

defp deps do
  [
    {:feeder_ex, "~> 1.0"},
    {:hackney, "~> 1.8.6"},
  ]
end

Sau đó ta sẽ hiện thực hàm lấy RSS feed trong module Rxx.Feed.

# lib/rxx/feed.ex

defmodule Rxx.Feed do
  def fetch_feed(source, url) do
    req_headers = []

    case :hackney.get(url, req_headers) do
      {:ok, status, _resp_headers, ref} ->
        case read_body(status, ref) do
          {:ok, feed, _} -> {:ok, feed}
          {:error, _} = error -> error
        end
      {:error, _} = error -> error
    end
  end

  defp read_body(200, ref) do
    case :hackney.body(ref) do
      {:error, _} = error -> error
      {:ok, body} -> FeederEx.parse(body)
    end
  end
  defp read_body(status, ref) do
    :hackney.skip_body(ref)
    {:error, {:status, status}}
  end
end

Hàm fetch_body nhận vào hai tham số sourceurl. Ở đó, hackney sẽ gửi request và nếu thành công, một tuple dạng {:ok, status, response_header, ref} sẽ được trả về, còn không thì {:error, reason}. Đây là một convention dùng rất nhiều trong Elixir.

Với trường hợp thành công, ta sẽ đọc response body và parse nó nếu status code là 200, còn không trả về lỗi.

Mặc định hackney không đọc response body mà trả về cho bạn một ref, ref này sẽ được giữ cho đến khi app của bạn tắt. Để tránh memory leak, bạn nên hoặc đọc hoặc skip nó.

Dùng Etag để tận dụng cache

Tuy giờ ta đã gửi request ngon lành cành đào, nhưng ta sẽ tận dụng Etag header để tiết kiệm chút băng thông đắt đỏ, phòng một ngày cá mập cắn cáp.

Để làm việc này thì ta cần phải lưu lại etag của request cuối cùng. Chắc bạn đã nhớ lại được phần Q&A ở trên, để lưu state trong Elixir, ta dùng process.

Ta sẽ cho Rxx.Feed trở thành một GenServer và lần lượt hiện thực các callback cần thiết.

# lib/rxx/feed.ex

defmodule Rxx.Feed do
  use GenServer

  defstruct [:last_etag]

  def start_link(options) do
    GenServer.start_link(__MODULE__, options)
  end

  def init(_) do
    state = %__MODULE__{last_etag: nil}
    {:ok, state, 0}
  end
end

init là một callback khá quan trọng của GenServer. Khi một process được khởi tạo, init sẽ được gọi và bạn cần phải trả về cho hàm này một tuple bao gồm 2 thứ: {:ok, state}, không thì nó sẽ tuột quần bạn. State chính là dữ liệu mà ta muốn process giữ, ở đây là struct chứa etag.

Nếu bạn phát hiện mình trả dư số 0, nhưng quần mình sẽ vẫn còn nguyên đó, đó là vì ta sẽ dùng nó ở phần tiếp theo.

__MODULE__ là module hiện tại, ở đây là Rxx.Feed, ta cũng đã định nghĩa một struct cùng tên với module này bằng defstruct và có một thuộc tính là last_etag.

Thật ra không đúng khi nói rằng process sẽ giữ state, mà process sẽ pass state. Process sẽ “giữ” state bằng cách gửi và nhận state cho chính mình liên tục. Xem thêm Message Sending.

Để hỗ trợ etag thì ta phải sửa lại hàm fetch_feed của ta một chút.

def fetch_feed(source, url, last_etag) do
  req_headers = maybe_put_etag([], last_etag)

  case :hackney.get(url, req_headers) do
    {:ok, 304, _, ref} ->
      :ok = :hackney.skip_body(ref)
    {:ok, status, resp_headers, ref} ->
      resp_etag = maybe_fetch_header(resp_headers, "ETag")
      case read_body(status, ref) do
        {:ok, feed, _} ->
          {:ok, feed, resp_etag}
        {:error, _} = error -> error
      end
    {:error, _} = error -> error
  end
end

defp maybe_put_etag(headers, nil), do: headers
defp maybe_put_etag(headers, etag), do: [{"If-None-Match", etag} | headers]

Logic của HTTP caching khá đơn giản.

  • Ở lần request đầu tiên không có etag, server sẽ trả về etag, ta giữ nó.
  • Ở lần request tiếp theo, ta gửi etag đó qua header tên If-None-Match.

    • Nếu server trả về 304 Not Modified, không có update.
    • Nếu server trả về 200 OK với etag mới, ta parse RSS feed mới, và update lại etag.

Ta xử lý trả về cho hàm fetch_feed của chúng ta cho ứng với logic trên.

  • 304: ta trả về :ok.
  • 200: ta trả về {:ok, feed, new_etag}.

Chắc bạn sẽ đặt câu hỏi vậy etag được lưu ở đâu? Hãy đọc tiếp phần sau.

Lưu lại etag nhận được

Một chút về messaging trong GenServer

Có 3 loại message bạn có thể gửi tới một process, tương ứng với 3 loại handle_* callback trong GenServer.

1. cast một message bất đồng bộ. Khi bạn cast một message tới process, bạn không quan tâm tới sống chết của message đó ra sao. Ví dụ như log một thông tin ra ngoài màn hình.

2. call một message đồng bộ. Bạn quan tâm tới quả trả về. Ví dụ checkout một connection từ pool.

3. info, không thuộc cả hai dạng trên. Có thể là :timeout message như ở trên, hoặc các loại message gửi từ hệ thống.

Ở đây ta sẽ dùng hiện thực callback loại 3 là handle_info, nhưng ta sẽ làm việc với handle_casthandle_call ở các bài viết sau.

Áp dụng

Bạn nhớ số 0 mà mình trả dư trong tuple-ba-phần-tử ở trên? Thật ra đó là cách mà bạn báo cho process đang chạy tự gửi một message :timeout cho chính mình sau một khoảng thời gian, ở đây là 0, nghĩa là ngay và luôn.

# lib/rxx/feed.ex

defmodule Rxx.Feed do
  def handle_info(:timeout, state) do
    %__MODULE__{last_etag: last_etag} = state

    interval = _3_minutes = 300_000

    case fetch_feed("hqc.io", "https://hqc.io/rss", last_etag) do
      {:ok, feed, etag} ->
        {:noreply, %__MODULE__{state | last_etag: etag}, interval}
      :ok ->
        {:noreply, state, interval}
      {:error, reason} ->
        Logger.error("Could not fetch feed (source: #{source}), reason: #{inspect(reason)}")
        {:noreply, state, 60_000}
    end
  end
end

Theo contract, handle_info/2 sẽ nhận vào tên messagestate hiện tại, và cần trả về một tuple hai phần tử {:noreply, state} với state là state mới.

Ứng dụng hàm fetch_feed mà ta đã viết ở trên, ta xử lý lần lượt cho từng loại kết quả ta nhận được với pattern matching.

  • :ok: nghĩa là feed không đổi, ta giữ nguyên state và trả về, và hẹn giờ gửi timeout message sau 5 phút.
  • {:ok, feed, etag}: nghĩa là feed đã thay đổi, ta update etag vào state và trả về state mới.
  • {:error, _}: log lỗi, và gửi lại request sau 60 giây.

Như vậy cái luồng của process Rxx.Feed là như sau.

  1. Process được start, handle_info/2 sẽ được gọi.
  2. Nhận được feed đầu tiên và etag, process sẽ giữ nó lại.
  3. Hết 5 phút timeout, handle_info/2 lại được gọi, lần này nó sẽ dùng etag để kiểm tra feed có cập nhật hay không.
  4. Nếu không có cập nhật, nó chờ message tiếp theo sẽ tới trong vòng 5 phút.
  5. Nếu có cập nhật, quay lại bước 2.

Nếu bạn đang lo lắng về data race cho state, thì mặc cho ở ngoài máy ảo erlang process chạy tứ tung loạn xạ, bên trong nội tại mỗi process, chỉ có một message được thực thi tại một thời điểm và đó là một thao tác đồng bộ.

Let It Crash

Giờ thì bạn có thể bật iex lên và chạy feed của bạn bằng cách:

{:ok, pid} = Rxx.Feed.start_link()

Thế nhưng một số câu hỏi được đặt ra.

  1. Chẳng lẽ mỗi lần ta đều phải chạy bằng tay?
  2. Lỡ process của chúng ta bị crash thì làm sao để ta bật lại?

Và Elixir/Erlang OTP đã xuất hiện để giải cứu bạn.

# lib/rxx.ex

defmodule Rxx do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      worker(Rxx.Feed, [], id: Rxx.Feed),
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

Ở đây Rxx.Feed sẽ được bật cùng với app Rxx, do đã được thêm vào supervision tree. Và khi nó crash, Supervisor sẽ làm công việc đơn giản là bật nó lại, các process đang chạy khác không bị ảnh hưởng trong trường hợp này.

Tạm kết

Quả là một bài viết dài . Nếu các bạn đang thắc mắc: Ủa nãy giờ tui đâu thấy ông lưu mấy cái link của RSS chỗ nào đâu? thì hãy để ý tới bài tiếp theo nhé.

Người ta nói là đọc xong mà không bấm vào nút like với share thì tác giả tủi thân lắm.


NGUY HIỂM! KHU VỰC NHIỀU GIÓ!
Khuyến cáo giữ chặt bàn phím và lướt thật nhanh khi đi qua khu vực này.
Chức năng này hỗ trợ markdown và các thứ liên quan.

Bài viết cùng chủ đề

Elixir - Ngôn ngữ được viết bằng macros

Macro không những là một trong những viên gạch giúp bạn meta-program với Elixir, mà nó còn là công cụ giúp Elixir meta-program … chính nó nữa.

IO data và Vectored IO

Bài viết giới thiệu về IO data, Vectored I/O và tối ưu hóa hệ thống dùng Elixir bằng cách tận dụng Vectored I/O.

Poolboy và kĩ thuật pooling trong Erlang/Elixir