Temporal Typescript SDK 學習筆記

Hi 大家好,這篇是紀錄學習基礎 Temporal Typescript SDK 的介紹與筆記(Temporal 支援多種語言使用),旨在快速學習與上手理解,雖然整理完文章還是很長 XDDDDD(學到崩潰

詳細介紹建議查看官方文件會更清楚喔,本篇是個人閱讀與前輩指點整理而成,感謝前輩指點迷津帶我飛~

Temporal 是什麼?

Temporal 是一個分布式、易擴展、持久且高度可用的工作編排引擎(workflow engine),用統一的 API 替許多日常的定時工作、排期工作進行編排自動化的長時間運行業務邏輯(比如訂閱扣款、爬蟲資料刷新等等)。

其中以五個概念為核心: Task, Activity, Workflow, Worker, Client

Task

temporal 中的 taskQuene 中包含兩個 quene,workflowTaskQuene, activityTaskQuene,兩者分別存放對應的 task,所以 task 也分為 workflowTask, activityTask,兩者的區別後續會提到,這邊可以先大概有個概念,詳細可參考官方的圖如下

Activity

一個包含程式執行環境的活動功能,通常為實際執行程式邏輯,處理的動作比起 workflow 較為單一,activity 在實際被 workflow 使用時會透過 proxyActivities 包裹,才會真正被定義 taskQuene

export async function greet(name: string): Promise<string> {
  return `Hello, ${name}!`;
}

Workflow

一個 Workflow 是由開發者定義的 Temporal 內部最小執行單元,包含一段無副作用的執行過程,每個 workflow execution 都擁有一個本地狀態,並獨佔訪問權,其他 workflow 無法直接訪問,workflow 彼此間以 並行 的方式執行,互不影響,若 workflow 彼此需要溝通可以透過 傳遞 signal 的方式進行

workflow 是一個可重入的過程,包含可恢復、反應式,不論設定多久,或是系統故障,將會自動重啟並重試

  • 可恢復:指 process 在因執行失敗,或者因執行等待而暫停後,可以繼續執行的能力
  • 反應式:指 process 可以對外部事件作出反應的能力

在 workflow 中取得 workflowInfo

呼叫 workflowInfo 可以取得當前 workflow 的資訊,比如 workflowId

import { workflowInfo } from '@temporalio/workflow'

export async function example(name: string): Promise<string> {
  const { workflowId } = workflowInfo()
  return await greet(name);
}

在 workflow 中呼叫 activity

在 workflow 中的環境不是程式環境(此為nodejs),而是一個特殊執行環境,故 activity 方法需透過 proxyActivities 解構之後才能使用,名稱與原來的方法相同

// workflows.ts
// 此為 temporal workflow 執行環境
// 執行流程看似一般程式,但其實背後 temporal 會將不同 task 分發出去給 workers 執行
// 這裏僅算是一個 workflow 工作流程的定義區
import { proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';

const { greet } = proxyActivities<typeof activities>({
  startToCloseTimeout: '30s',
  // 這邊可以定義被 proxy 的 activity 實際會觸發什麼 taskQuene
  // 讓對應監聽的 worker 去處理這個 activiyTask
  // 預設情況下會觸發 `default` worker
  // taskQuene: 'default'
});

/** A workflow that simply calls an activity */
export async function example(name: string): Promise<string> {
  return await greet(name);
}

在 workflow 中呼叫 child workflow

如果需要在當前 workflow 中調用另一個子 workflow,可以使用兩個方法

  • executeChild: 可回傳 promise await 等子 workflowTask 執行完(推薦)
  • startChild: 丟一個 child workflowTask 執行出去後就不管他
// workflows.ts
import { executeChild, proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';

const { greet } = proxyActivities<typeof activities>({
  startToCloseTimeout: '30s'
});

export async function childExample({ msg }) {
  return `Johnny is ${msg}`
}

export async function example(): Promise<string> {
  const name = await executeChild(childExample, {
    args: [{ msg: 'very well' }],
    workflowId: `child-example-${Date.now()}`,
    // taskQueue: 'default', // 這裡也可以指定 taskQuene
  })
  return await greet(name);
}

Worker

worker 是 temporal 裡面實際執行 task 的 工作者,每個 worker 會指定監聽的 taskQuene 目標,每當監聽的 quene 新增 task 時就會主動去 poll task 來執行

建立 Worker

建立時綁定每個 worker 對應可以使用的 activities, workflowsPath,但 worker 不一定要綁定 workflow,可以只綁定 activities,並在其他 workflow 中呼叫時透過 taskQuene 指定讓某個特定處理 activities 的 worker 處理

import { Worker } from '@temporalio/worker';
import * as activities from './activities';

const workers = []

async function run() {
  const worker = await Worker.create({
    workflowsPath: require.resolve('./workflows'),
    activities,
    taskQueue: 'default',
  });
  workers.push(worker)

  const activitiesWorker = await Worker.create({
    activities,
    taskQuene: 'hello-world' // proxyActivities 時指定為 hello-world,這個 worker 就會去處理~
  });
  workers.push(worker)

  await Promise.all(workers)
}

run().catch((err) => {
  console.error(err);
  workers.forEach((w) => w.shutdown());
  process.exit(1);
});

Client

client 是提供用戶端開發時進行調用 workflow task 的主要途徑,也可以透過 command line 進行調用

前提是你的 temporal server 已經啟動就緒

Programming

建立新的 temporal workflow client 實例

// client.ts
import { Workflow } from '@temporalio/workflow';
import { Connection, WorkflowClient } from '@temporalio/client';

let client: WorkflowClient;

export async function getClient() {
  if (!client) {
    const connection = await Connection.connect({
      // Connect to localhost with default ConnectionOptions.
      // In production, pass options to the Connection constructor to configure TLS and other settings:
      // address: 'foo.bar.tmprl.cloud', // as provisioned
      // tls: {} // as provisioned
    });

    client = new WorkflowClient({
      connection,
      namespace: 'default', // change if you have a different namespace
    });
  }

  return client;
}

透過 client 物件調用執行 workflow

import { getClient } from './client'
import { example } from './workflows'

async function run() {
  const client = await getClient();
  const handle = await client.start(example, {
    args: ['Temporal'], // 參數傳遞給 example workflow
    taskQueue: 'hello-world', // 加入的 taskQuene 名稱,對應 worker 必須能處理此 workflow
    workflowId: 'example-workflow-id',
    // workflow id reuse policy 參考結尾列表
    workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
    // retry policy
    retry: {
      maximumAttempts: 20
    }
  })
  const res = await handle.result()
}

run()

Command line

# 環境變數設為預設(local)
export TEMPORAL_CLI_ADDRESS=
# 執行 workflow
tctl wf start --tq default -i '[workflow-args]' -w [workflow-id] --wt [workflow-type]
# 取消 workflow
tctl wf cancel -w [workflow-id]

Signals & Querys

這兩者都是用在與 workflow 進行溝通傳遞訊息用,但有些微區別

Signals 範例

以下範例為一個訂閱機制 workflow

// workflows.ts
import { defineSignal, setHandler, sleep } from '@temporalio/worker'

export const cancelSubscription = defineSignal('cancelSignal'); // new

export async function SubscriptionWorkflow(
  email: string,
  trialPeriod: string | number
) {
  let isCanceled = false; // internal variable to track cancel state

  setHandler(cancelSubscription, () => void (isCanceled = true)); // new

  await acts.sendWelcomeEmail(email);
  await sleep(trialPeriod); // sleep will wait a fixed time
  if (isCanceled) {
    await acts.sendCancellationEmailDuringTrialPeriod(email); // new
  } else {
    await acts.sendSubscriptionOverEmail(email);
  }
}

- Invoke by Client

// other-client.ts
import { cancelSubscription } from './workflows';
import client from './client';

function signalWorkflow(workflowId, signalName, ...args) {
  const handle = client.getHandle(workflowId);
  await handle.signal(signalName, ...args);
}

signalWorkflow('my-workflow-id', cancelSubscription)

- Using condition with timeouts

sleep(ms) 將執行延遲固定時間,condition 將執行無限期延遲,直到給定的條件函數判斷返回 true,假設有個檢查函數,我們要判斷檢查是否完全執行完畢可以寫成這樣

// workflows.ts
export const childCompletedSignal = defineSignal(childCompletedSignal)

export async function checkDataWorkflow() {
  let completedChildWorkflows = 0;
  let totalChildWorkflows = 0;

  setHandler(childCompletedSignal, () => {
    completedChildWorkflows += 1;
  });

  const { workflowId } = workflowInfo();
  const dataList = await getChildData(); // activity to call rest api to get childData

  const childHandles = dataList.map((data) => {
    return startChild(checkPartialData, {
      args: [{
        parentWorkflowId: workflowId,
        data
      }],
      workflowId: `check-child-${Date.now()}`
    })
  })

  totalChildWorkflows = childHandles.length;

  // 這邊其實可以用 executeChild 搭配 Promise.all 更直覺,但這只是一個示範
  // wait until all child complete
  await condition(() => completedChildWorkflows === totalChildWorkflows)

  return true
}

export async function checkPartialData({
  parentWorkflowId,
  data
}) {
  // check data...
  // send signal to parent by parentWorkflowId
  await signalWorkflow(parentWorkflowId, childCompletedSignal)
}

甚至還有 inlineSignal,詳情可見Signals官網說明open in new window

Querys 範例

透過 query 取得更新資料 workflow 的更新狀態

export const userInfoStatusQuery = defineQuery('userInfoStatusQuery');

export async function updateUserInfoWorkflow() {
  let completedAt = null

  // return workflow status when being queried
  setHandle(userInfoStatusQuery, () => ({
    completed: !!completedAt,
    completedAt,
  }))

  await updateUserInfo() // activity to update user info

  completedAt = new Date();

  return true; // 即使 workflow completed 後,query 此 workflowId 仍然能取得他的狀態
}

Query by Client

跟 Signals 的處理幾乎相同,差別在可以返回狀態

// other-client.ts
import { userInfoStatusQuery } from './workflows';
import client from './client';

async function queryWorkflow(workflowId, signalName, ...args) {
  const handle = client.getHandle(workflowId);
  return await handle.query(signalName, ...args);
}

queryWorkflow('my-workflow-id', userInfoStatusQuery).then(
  (status) => console.log(status.completedAt)
)

custom useState function

import * as wf from '@temporalio/workflow';

function useState<T = any>(name: string, initialValue: T) {
  const signal = wf.defineSignal<[T]>(name);
  const query = wf.defineQuery<T>(name);
  let state: T = initialValue;
  return {
    signal,
    query,
    get value() {
      // need to use closure because function doesn't rerun unlike React Hooks
      return state;
    },
    set value(newVal: T) {
      state = newVal;
    },
  };
}

// usage in Workflow file
const store = useState('your-store', 10);
function YourWorkflow() {
  wf.setHandler(store.signal, (newValue: T) => {
    // console.log('updating', newValue) // optional but useful for debugging
    store.value = newValue;
  });
  wf.setHandler(store.query, () => store.value);
  while (true) {
    console.log('sleeping for ', store.value);
    wf.sleep(store.value++ * 100); // you can mutate the value as well
  }
}

// usage in Client file
await handle.signal(store.signal, 30);
const storeState = handle.query<number>(store.query); // 30

Continue as New

  • continue-as-newopen in new window
  • large-event-historiesopen in new window
    打開 Temporal Web UI 可以看到每個 worker 執行 task 時,左側的編號就是 event 編號,預設最大單一 worker 能夠執行的 events 數量為 50000,超過就必須 renew,建議可以如下面方式以分頁的形式 1000 筆執行一個 partialWorkflow,並透過 while loop 判定是否還有剩餘需執行的內容,避免效能崩潰
// workflows.ts
export async function fetchAllData({
  startPage,
  pageSize
}) {
  let hasNextPage = true;
  let page = startPage;

  while (hasNextPage) {
    hasNextPage = await executeChild(fetchPartialData, {
      args: [page, pageSize],
      workflowId: `fetch-partial-${page}`,
    });

    page += 1;
  }

  return true
}

export async function fetchPartialData(
  page = 0,
  pageSize = 1000
) {
  // getUsers could be your activity to call rest api to get users
  const { userIds, pageInfo } = await getUsers(page, pageSize);

  await Promise.all(
    userIds.map(async (userId) => {
      await executeChild(fetchData, {
        args: [userId],
        workflowId: `fetch-user-${userId}`,
      });
    })
  )

  return pageInfo.hasNextPage
}

export async function fetchData(userId) {
  // do something with userId
}

Install Example

Temporal 官方提供了快速搭建整套環境的 example,只需要使用 npx 快速就可以快速下載試用

下載 example

$ npx @temporalio/create@latest ./myfolder

下載 docker-compose 環境配置

獲取最新的 temporal docker-compose 配置檔案,包含不同環境的設定,下載後啟動即可

$ git clone https://github.com/temporalio/docker-compose.git temporal-server
$ cd temporal-server
$ docker-compose up

Temporal Web UI

Temporal 現在提供兩個 version 的 Web UI,就依照喜好選擇你要看哪個摟~,預設配置裡兩個會同時啟動(未來不知道會不會把 v1 移除)

其他範例

製作 timeout 效果

const wrapTimeout = (targetHandle, maxWaitSeconds = 30) => {
  return Promise.race([
    targetHandle,
    new Promise((r) => setTimeout(r, maxWaitSeconds * 1000)),
  ]);
}

const runWorkflow = async () => {
  const wfHandle = await client.start(myWorkflow, {
    args: [],
    workflowId: 'my-workflow-id'
  });
  // workflow 跑超過 30s 就直接往後不等,避免卡住
  await wrapTimeout(wfHandle.result(), 30)

  const status = await wfHandle.query(checkStatusQuery);
  return status;
};

獲取 workflow 狀態

Temporal 在 workflow handle 物件中有提供以下方法可以獲得當前指定 workflow 的相關資訊狀態

下面範例是如何在當前 workflow 中取消自己的範例,例如結束 cronSchedule 的情況,因為 workflow 環境無法使用 client 工具,必須拉出來到 activity 中使用

// my-activities.ts
import { temporal } from '@temporalio/proto';
import { getClient } from './client';

const { WorkflowExecutionStatus } = temporal.api.enums.v1;

export async function terminateWorkflow(workflowId: string) {
  const client = await getClient();
  const wfHandle = client.getHandle(workflowId);
  // 取得 workflow handle 的當前狀態
  const { status } = await wfHandle.describe();
  // 判斷 code 狀態是否處在 running
  if (status.code === WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) {
    // 終止 cronSchedule,若使用 cancel 只會停止當前的 workflow
    // https://docs.temporal.io/concepts/what-is-a-temporal-cron-job/#how-to-stop-a-temporal-cron-job
    await wfHandle.terminate();
  }
}
// workflow.ts
import { proxyActivities, workflowInfo } from '@temporalio/workflow';
import * as myActivities from './myActivities';

const { terminateWorkflow } = proxyActivities<typeof myActivities>({
  startToCloseTimeout: '30s',
});

export async function TestWorkflow() {
  const { workflowId } = workflowInfo();
  // ...
  await terminateWorkflow(workflowId);
}

今天就介紹到這拉,後續如果有其他新的想法也會持續更新紀錄在這,感謝收看,下次再見 =V=

Reference

Last Updated:
Contributors: johnnywang