Redmi Buds 6 Liteを睡眠時の耳栓がわりに使ってみたら意外とイケた
耳栓をつけないと音が気になって寝れない。
耳栓さえつければ入眠はできるので問題ないんだけど、ずっと耳栓をつけてると耳道に負荷はかかっているようでたまにヒリヒリしたりする。
以前から睡眠時にアクティブノイズキャンセリング(ANC)がついたイヤフォンを使うというアイディアはあったんだけど、睡眠時に装着するものなのでできるだけ小さいものではないと、寝返りをしたり横になったタイミングで違和感を感じて目が覚めたりイヤフォンが外れたりする。
Appleの耳うどんは性能的にはよさそうだったがApple製品は買わないことにしているので見送った。
それから何年も経ってようやくXiaomiが同じようなサイズでANCがついたイヤフォンをリリースしてくれた。なんと2480円(税込み)という衝撃価格。
ANC性能はなかなかで、フィット感も悪くない。
しばらく耳栓代わりに使ってみたが、正直期待していたより寝心地(?)がよく、耳栓よりトータルでは優れているかもしれない。
Redmi Buds 6 Liteが優れている点:
- 耳への負荷
- つけ心地(仰向けで寝ているとき)
Redmi Buds 6 Liteが劣っている点:
- 遮音性能
- つけ心地(横向きで寝ているとき)
- バッテリー稼働であること
Redmi Buds 6 Liteの最大の弱点が「バッテリー稼働であること」で、スペック的には持続時間は7時間。睡眠時は音楽再生はせず無音なのでこれより伸びるんだけど、8-9時間が上限。バッテリーがヘタったときはどんどん短くなってしまいそう。自然な目覚ましと考えるとまあ悪くはないが…。
スマホをXiaomi 14 Ultraに変えた - 移行作業メモ
- IIJのフリーダイアルに電話をかけてSIM移行手続きを完了
- Xiaomi 13 UltraとXiaomi 14 Ultraをケーブルで繋いでデータを移行
- 移行されたアプリのインストール完了を待つ
- Nova launcherのデータをバックアップ&復元で移行
- Microsoft Authenticatorはクラウドにバックアップ&新デバイスでは「バックアップの復元」を選択する
- Oracle Authenticatorの引き継ぎ処理をする(MFAを一旦無効化、新しいデバイスで再有効化)
- Resilio Syncで旧端末の画像、動画をバックアップ
- 新端末にResilio Syncのバックアップ設定を実施
- 旧端末を初期化
2024年度を振り返って - 買い物編
安価なもの
今年一番ハマった曲
貝印 KAI しぼり器 フルーツしぼり ミニ キッチンツール SELECT100 DH3011 シルバー 柑橘などを絞るのに意外と便利。
豆の味がして高タンパクで美味しい。
- 夜にお腹がすいたときに
- TP-Linkのソーラー充電監視カメラはスペックを考えると破格。操作アプリの出来もよくおすすめ
高価なもの
Xiaomi 13 Ultraからの乗り換え。正常進化版というフィーリング。
次の構成で揃えた。不満なし。非常に美しいプロダクト。Ubiquitiの株を買おうか迷って買わなかった。買っておけばよかった…。
Dream Machine Pro
U6 Enterprise
U6 Long-Range
PoE+ Adapter
- 単に部屋の匂いが無臭になるだけじゃなく、精神安定?リフレッシュ?のような効果を感じる。ずっと同じ匂いを嗅いでいることによる疲れとかがあるんだろうか…
- 型落ちだけど5090が出るまでの間のつなぎに購入
ZOTAC GAMING GeForce RTX 3090 Ti AMP Extreme Holo
- よいスピーカー
ELAC VELA FS 409.2
- よいDACとヘッドホンアンプ
S.M.S.L SU-X Dual ES9039MSPRO デジタル/アナログコンバーターデコーダー (DAC)
お取り寄せ
豊洲市場ドットコムでバイヤーのおすすめとして紹介されているものはだいたい美味しい。
2024年度を振り返って
プライベート
昨年末に東京から地元にUターンし、2024年度はずっと地元で生活した。東京にはかれこれ5年いたことになる。
地元ではマトモな職がなかったため、ツイッターのフォロワーの誘いもあり「フリーランスのソフトウェアエンジニアで月収100万!!」みたいな景気のよい話を真に受けて上京してみたら本当にそういう給与レンジに数年で到達できたので上京しての就活は大正解だった。
しかし東京の生活は自分には合わない面もあり、特に家族が近くにいないのはキツかった。お金を稼げるようにはなったが、稼いでそれで何するの?というエクスキューズに対する答えを見つけられなかった。
お金が溜まっていけば早くFIREできるというのはそうだが、その生活が楽しいかというと疑問符がついた。
たびたび実家に帰るときに東京駅で山ほどお土産を買って帰ると楽しいということがあり、つまりお金を稼いで家族に分配するという本能的な喜びが東京での一人暮らしでは日常的には得られなかったのだと後で気づいた。
年老いていく家族に年何回かしか顔を合わせない生活だと、あとたかだか何十回か会ったら終わりだというのも受け入れがたかった。
地元に戻ってからは上記の問題は解消し、ストレスなく生活できている。
仕事
2019年から外資ソフトウェアベンダーでSupport Engineerとして働いており、ここ数年はManagerを兼ねるような仕事をしている。
2023年の後半に休職してスノボで負傷した肩の手術とリハビリをした。術後に完全に手を同じ姿勢で固定しなければいけない期間があり、そのストレスでパニック障害のような症状が出たりして…、とそれはそれで大きなトピックなのだが、他に遥かに大きなインパクトの事件があった。そう、休職から戻ってきてみたら部署のリーダー3人のうち、2人が辞めてしまっていた…。
残された3人のうち1人である私の2024年は、それはそれは大変なことになった。
以前は他のリーダー1名がサーバー関連(Server Line)の製品群を見て、私を含む2人がデスクトップ関連(Desktop Line)の製品群を見ていたのだが、私が1人でDesktop Lineを見るようになった。Server Lineに関しても新しいリーダーの習熟までフォローを行ったが、これは新たに選出されたリーダーが優秀なこともあり、負担としては軽いものだった。
部内には30名弱のスタッフがいて、2/3程度がDesktop Lineに属している。今年は彼らのマネージメントをより深くやってきた。
グローバルと横並びで見たときのJapanのKPIの弱さ(NPSがNeutralに偏る)を私は特に気にかけてケアを行った。KPI改善のための施策として、すべてのサポートスタッフが質問をいつでもSlackチャンネルに投稿できるようにして、この回答を質問を受ける側に義務付けた。誰でも質問に回答してよいが、回答がなかったり、回答者も判断に迷えば質問は最終的に私に上がってる。他の施策もあり、全マネージャー、全スタッフが今年は注意をしてケアを行えたと思う。
数多くの製品を持ちそれぞれが急速に進化するため、製品ごとに置いたSME(Subject Matter Expert)のトレーニングは常に行っていかないといけない。
以前から進められているグローバルへのアラインによって、担当製品群に関するVirtual TeamにJapanからも参加して双方向のコミュニケーションを取るようになった。
このMTGが目下最大の課題で、私の英語力が必要なレベルに達していないことによって、かなりの事前準備のコストを払いストレスを受け…、そしてそれに比例した楽しさ、やりがいを感じている。
顧客から個々の問い合わせに関してはJapanもグローバルも類似した内容になるということはわかっていたが、マネージメントのレベルでも、これまで我々が悩んできたあるいは解決した問題が、グローバルと共通であるということは新しい発見で、このレイヤーでの知見の共有によるシナジーは今後私がやっていかなければならないことになるだろう。
いろいろチャレンジはあったものの、なんとかなった。立場が人を作るという格言に共感する1年だった。
Introduce Whisper large-v3-turbo model into faster-whisper and whisper_streaming
Introduce Whisper large-v3-turbo into faster-whisper
All you have to do is just follow the hints discussed in the issue below:
- Fetch the model into the local
from huggingface_hub import snapshot_download repo_id = "deepdml/faster-whisper-large-v3-turbo-ct2" local_dir = "faster-whisper-large-v3-turbo-ct2" snapshot_download(repo_id=repo_id, local_dir=local_dir, repo_type="model")
- Specify
faster-whisper-large-v3-turbo-ct2
when creating WhisperModel
from faster_whisper import WhisperModel model = WhisperModel("faster-whisper-large-v3-turbo-ct2")
Introduce Whisper large-v3-turbo into whisper_streaming
- Add
faster-whisper-large-v3-turbo-ct2
into the choise set of --model argument
parser.add_argument('--model', type=str, default='large-v2', choices="tiny.en,tiny,base.en,base,small.en,small,medium.en,medium,large-v1,large-v2,large-v3,large,faster-whisper-large-v3-turbo-ct2".split(","),help="Name size of the Whisper model to use (default: large-v2). The model is automatically downloaded from the model hub if not present in model cache dir.")
- Specify
faster-whisper-large-v3-turbo-ct2
as the model when calling whisper_online_server.py
python whisper_online_server.py --model faster-whisper-large-v3-turbo-ct2 --backend faster-whisper # and so on
Connect Cloud Functions to Postgres(on-prem) with Tailscale and Gost
Serverless functions have become increasingly popular due to their scalability and ease of use. However, connecting these functions to on-premises resources, such as databases, can be challenging. In this post, I'll explore an solution to connect Google Cloud Functions to an on-premises PostgreSQL database using Tailscale for secure networking and Gost for port forwarding.
Problem
Cloud Functions are designed to be stateless and ephemeral, making it difficult to establish persistent connections to resources outside the cloud environment. This becomes particularly problematic when you need to interact with on-premises databases that aren't directly accessible from the cloud.
Solution
We'll use a combination of technologies to bridge this gap:
- Google Cloud Functions: Our serverless compute platform.
- Tailscale: A modern VPN built on WireGuard, providing secure networking.
- Gost: A versatile proxy server that will forward our database connections.
Let's dive into the implementation details.
The Cloud Function code
First, let's look at our basic Cloud Function code:
import functions_framework @functions_framework.cloud_event def cloud_event_function(cloud_event): pass
This is a minimal Cloud Function that uses the functions_framework
decorator. In a real-world scenario, you'd add your database interaction logic here.
Building the Custom Container
To incorporate Tailscale and Gost, we need to build a custom container. Here's our Dockerfile:
FROM golang:1.22.6-bullseye AS builder # Build gost. WORKDIR /proxy RUN git clone https://github.com/ginuerzh/gost.git \ && cd gost/cmd/gost \ && go build FROM python:3.10 # Copy gost binary. WORKDIR /proxy COPY --from=builder /proxy/gost/cmd/gost/gost ./ WORKDIR /app # Install dependencies. COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt # Copy source code. COPY . . RUN chmod +x start.sh # Copy Tailscale binaries from the tailscale image on Docker Hub COPY --from=docker.io/tailscale/tailscale:stable /usr/local/bin/tailscaled /app/tailscaled COPY --from=docker.io/tailscale/tailscale:stable /usr/local/bin/tailscale /app/tailscale RUN mkdir -p /var/run/tailscale /var/cache/tailscale /var/lib/tailscale # Run on container startup. CMD ["/app/start.sh"]
This Dockerfile does several important things:
- It builds Gost from source in a separate stage.
- It sets up a Python environment for our Cloud Function.
- It copies the Tailscale binaries from the official Docker image.
Configuring the Startup Script
The heart of our solution lies in the startup script:
#!/bin/sh echo Tailscale starting... /app/tailscaled --tun=userspace-networking --socks5-server=localhost:1055 & /app/tailscale up --authkey=${TAILSCALE_OAUTH_CLIENT_SECRET} --hostname=${TAILSCALE_MACHINE_NAME} --advertise-tags=tag:tag_for_your_postgres_server echo Tailscale started echo Gost starting... /proxy/gost -L tcp://:5432/your_postgres_server:5432 -F socks5://localhost:1055 & echo Gost started functions-framework --target cloud_event_function --signature-type cloudevent
This script:
- Starts Tailscale in userspace networking mode and connects to your Tailscale network.
- Launches Gost, configuring it to forward connections from port 5432 to my on-prem PostgreSQL server through the Tailscale SOCKS5 proxy.
- Starts the Cloud Function using the functions-framework.
How It All Works Together
When deployed, this setup creates a secure tunnel between your Cloud Function and your on-premises network:
- Tailscale establishes a secure connection to your private network.
- Gost listens on port 5432 (the default PostgreSQL port) and forwards connections through the Tailscale network.
- Your Cloud Function can now connect to
localhost:5432
, and Gost will securely proxy the connection to your on-premises PostgreSQL server.
Deployment and Configuration
To deploy this solution:
- Build and push your container image to Google Container Registry or Artifact Registry.
- Deploy your Cloud Function, specifying your custom container.
- Set the necessary environment variables:
TAILSCALE_OAUTH_CLIENT_SECRET
: Your Tailscale auth keyTAILSCALE_MACHINE_NAME
: A unique name for this Tailscale node
Conclusion
This approach allows you to securely connect your Cloud Functions to on-premises resources without exposing your database to the public internet. It combines the scalability and ease of use of serverless functions with the security and flexibility of a modern VPN solution.
While We're focused on PostgreSQL, this method can be adapted for other databases or services. The combination of Tailscale and Gost provides a powerful and flexible way to bridge the gap between cloud and on-premises resources.
Remember to always follow security best practices and thoroughly test your setup before using it in a production environment.
Mobile-First Data Handling Strategy Realized Through Client-Side Event Sourcing
↓ It's a technical article, but it's about how this app was created ↓
Please check it out!
1. Introduction
This article proposes a client-side data management method using event sourcing.
The proposal aims to address the following requirements:
- Reduce database server maintenance costs and allow the application to hold data independently in the initial stages
- Design that can flexibly accommodate future needs for synchronization with servers
- Design that can flexibly respond to changes in data aggregation requirements
- Easily usable from React-implemented client applications
This article will demonstrate that the proposed implementation meets the above requirements and can ultimately be used from React-based client applications in the following manner:
Recording events
const { addEvent } = useListenEvent(); addEvent({ itemId });
Using aggregated data
const { totals } = useListenEvent(); const total = totals[itemId];
2. Technologies Used
To realize this proposal, we adopted event sourcing and CQRS.
Event sourcing is a technique that records state changes in an application as events and reconstructs the current state by replaying these events. This method has the following advantages:
- Maintaining a complete audit trail
- Ability to rewind the state to any point in time
- Flexible analysis and derivation of models based on events
CQRS is an architectural pattern that separates the responsibilities of data updates (commands) and reads (queries). This pattern has the following advantages:
- Ability to optimize reading and writing independently
- Simplification of complex domain models
3. System Design
The proposed system consists of the following components:
- EventStore: Responsible for storing and retrieving events
- EventAggregator: Responsible for aggregating events and managing state
- React Hooks: Responsible for connecting UI and data layer
By combining these components, we realize event sourcing and CQRS on the client side.
4. Implementation Details
4.1 Data Structures
The basic data structures are as follows:
// Base type for aggregatable events interface Aggregatable {} // Type for data recorded in the local database interface StoredEvent<V extends Aggregatable> { localId: number; // Auto-increment primary key globalId: string; // UUIDv7 for global primary key value: V; } // Base type for aggregated data interface AggregatedValue<T> { aggregationKey: T; }
Events are defined as Aggregatable, become StoredEvent when saved in the local database, and then become AggregatedValue after aggregation processing.
Aggregatable and AggregatedValue can freely set properties in interfaces that inherit from them.
StoredEvent has a localId that is automatically numbered locally, as well as a globalId in UUIDv7 format. These values ensure that processing can be done in chronological order with some degree of accuracy even if the data is sent to the server and processed in the future.
Based on these interfaces, specific events and aggregated values are defined.
4.2 EventStore
The EventStore class provides functionality to store events using IndexedDB and retrieve them efficiently. The main features are as follows:
- Adding events
- Retrieving events before and after a specified ID
- Event subscription functionality
import { Aggregatable, StoredEvent } from "./types"; import { v7 as uuidv7 } from 'uuid'; export interface GetEventsOptions { limit?: number; } const DatabaseVersion = 1; const DataStoreName = 'Events'; export class EventStore<V extends Aggregatable> { private db: IDBDatabase | null = null; private listeners: Set<(msg: StoredEvent<V>) => Promise<void>> = new Set(); constructor(public databaseName: string) {} async initialize(): Promise<void> { await this.initializeDatabase(); } async add(value: V): Promise<void> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([DataStoreName], 'readwrite'); const store = trx.objectStore(DataStoreName); const globalId = uuidv7(); const request = store.add({ globalId, value }); request.onerror = () => reject(new Error(`Add error: ${request.error?.message || 'Unknown error'}`)); request.onsuccess = () => { const localId = request.result as number; const storedEvent: StoredEvent<V> = { localId, globalId, value }; this.broadcastAddEvent(storedEvent); resolve(); }; trx.onerror = () => reject(new Error(`Transaction error: ${trx.error?.message || 'Unknown error'}`)); trx.onabort = () => reject(new Error('Transaction aborted')); }); } async getEventsAfter(localId: number, options?: GetEventsOptions): Promise<StoredEvent<V>[]> { return this.getEvents( 'next', IDBKeyRange.lowerBound(localId, true), options); } async getEventsBefore(localId: number, options?: GetEventsOptions): Promise<StoredEvent<V>[]> { return this.getEvents( 'prev', IDBKeyRange.upperBound(localId, true), options); } private async getEvents( direction: IDBCursorDirection, range: IDBKeyRange, options?: GetEventsOptions ): Promise<StoredEvent<V>[]> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([DataStoreName], 'readonly'); const store = trx.objectStore(DataStoreName); const results: StoredEvent<V>[] = []; const request = store.openCursor(range, direction); request.onerror = () => reject(request.error); request.onsuccess = (event) => { const cursor = (event.target as IDBRequest<IDBCursorWithValue>).result; if (cursor) { const storedEvent: StoredEvent<V> = { localId: cursor.key as number, globalId: cursor.value.globalId, value: cursor.value.value }; results.push(storedEvent); if (!options?.limit || results.length < options.limit) { cursor.continue(); } else { resolve(results); } } else { resolve(results); } }; }); } private async initializeDatabase(): Promise<void> { ... snip ... } async hasAnyRecord(): Promise<boolean> { ... snip ... } private broadcastAddEvent(event: StoredEvent<V>) { ... snip ... } subscribe(listener: (msg: StoredEvent<V>) => Promise<void>): () => void { ... snip... } dispose() { ...snip... } }
4.3 EventAggregator
The EventAggregator class aggregates events and manages the current state. It is an abstract class, and concrete classes inheriting from it need to implement the functions marked as abstract. The main features are as follows:
- Processing new events
- Batch processing for event aggregation
- Managing processed ranges
- Providing aggregation results
import {AggregatedValue, Aggregatable, StoredEvent} from "./types"; import { EventStore, GetEventsOptions } from "./eventStore"; interface ProcessedRange { start: number; end: number; } const MetadataStoreName = 'Metadata'; const ProcessedRangesKey = 'ProcessedRanges'; export type AggregatorChangeListener<V> = (changedItem: V) => void; export abstract class EventAggregator<V extends Aggregatable, A extends AggregatedValue<string>> { protected db: IDBDatabase | null = null; private processedRanges: ProcessedRange[] = []; private processingIntervalId: number | null = null; private listeners: Set<AggregatorChangeListener<A>> = new Set(); constructor( protected eventStore: EventStore<V>, protected databaseName: string, protected databaseVersion: number = 1, protected batchSize: number = 100, protected processingInterval: number = 1000, ) {} async initialize(): Promise<void> { ...snip... } // Implement these abstract functions in inherited classes protected abstract applyMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void; protected abstract processEvent(trx: IDBTransaction, event: StoredEvent<V>): Promise<A>; private applyMetadataMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void { if (oldVersion < 1) { const store = db.createObjectStore(MetadataStoreName, {keyPath: 'key'}); } } private async handleNewEvent(ev: StoredEvent<V>): Promise<void> { if (!this.db) throw new Error('Database not initialized'); const tx = this.db.transaction(this.db.objectStoreNames, 'readwrite'); try { const changedItem = await this.processEvent(tx, ev); this.updateProcessedRanges(ev.localId, ev.localId); await this.saveProcessedRanges(tx); if (changedItem) { this.notifyListeners([changedItem]); } return new Promise<void>((resolve, reject) => { tx.oncomplete = () => resolve(); tx.onerror = () => reject(tx.error); }); } catch (err) { console.error("Error processing new event:", err); tx.abort(); } } private async processEvents(): Promise<void> { if (!this.db) throw new Error('Database not initialized'); if (this.isFullyCovered()) { this.stopProcessing(); return; } const range = await this.findRangeToProcess(); if (!range) { return; } const options: GetEventsOptions = { limit: Math.min(this.batchSize, range.end - range.start - 1) }; const eventsBefore = await this.eventStore.getEventsBefore(range.end, options); if (eventsBefore.length === 0) { return; } const maxId = eventsBefore[0].localId; const minId = eventsBefore[eventsBefore.length-1].localId; const changedItems: A[] = []; const tx = this.db.transaction(this.db.objectStoreNames, 'readwrite'); try { for (const ev of eventsBefore) { const changedItem = await this.processEvent(tx, ev); if (changedItem) { changedItems.push(changedItem); } } if (eventsBefore.length < this.batchSize) { this.updateProcessedRanges(1, maxId); } else { this.updateProcessedRanges(minId, maxId); } await this.saveProcessedRanges(tx); return new Promise<void>((resolve, reject) => { tx.oncomplete = () => resolve(); tx.onerror = () => reject(tx.error); }); } catch (err) { console.error("Error processing events:", err); tx.abort(); } this.notifyListeners(changedItems); } private async findRangeToProcess(): Promise<ProcessedRange | null> { const size = this.processedRanges.length; if (size === 0) { return { start: 0, end: Number.MAX_SAFE_INTEGER } } const rangeEnd = this.processedRanges[size-1].start; if (rangeEnd === 1) { return null; } if (rangeEnd === 0) { throw new Error('Unexpected value'); } if (1 < size) { const rangeStart = this.processedRanges[size-2].end; return { start: rangeStart, end: rangeEnd }; } // size === 1 return { start: 0, end: rangeEnd }; } private isFullyCovered(): boolean { if (this.processedRanges.length === 0) { this.eventStore.hasAnyRecord().then((hasAnyRecord) => { return !hasAnyRecord; }); return false; } return this.processedRanges.length === 1 && this.processedRanges[0].start === 0; } private updateProcessedRanges(start: number, end: number): void { const newRange: ProcessedRange = { start, end }; const allRanges = [...this.processedRanges, newRange]; allRanges.sort((a, b) => a.start - b.start); const mergedRanges: ProcessedRange[] = []; let currentRange = allRanges[0]; for (let i = 1; i < allRanges.length; i++) { const nextRange = allRanges[i]; if (currentRange.end + 1 >= nextRange.start) { currentRange.end = Math.max(currentRange.end, nextRange.end); } else { mergedRanges.push(currentRange); currentRange = nextRange; } } mergedRanges.push(currentRange); this.processedRanges = mergedRanges; } private async loadProcessedRanges(): Promise<void> { ...snip... } private async saveProcessedRanges(tx: IDBTransaction): Promise<void> { ...snip... } startProcessing(): void { ...snip... } stopProcessing(): void { ...snip... } private notifyListeners(changes: A[]): void { ...snip... } subscribe(listener: AggregatorChangeListener<A>): () => void { ...snip... } private async initializeDatabase(): Promise<void> { ...snip... } dispose() { ... snip... } }
5. Specific Use Case
To demonstrate the effectiveness of this system, we present an example of its use in a music playback application.
5.1 Recording and Aggregating Music Playback Events
MusicListenEventStore stores events indicating that music has been played. MusicListenEventAggregator aggregates the total number of plays for each track.
MusicListenEventStore and MusicListenEventAggregator Classes
import {Aggregatable, AggregatedValue, StoredEvent} from "../../types"; import {EventStore} from "../../eventStore"; import {EventAggregator} from "../../eventAggregator"; const TotalStoreName = 'Total'; export interface MusicListenEvent extends Aggregatable { itemId: string; } export interface MusicListenAggregationValue extends AggregatedValue<string> { total: number; } export class MusicListenEventStore extends EventStore<MusicListenEvent> {} export class MusicListenEventAggregator extends EventAggregator<MusicListenEvent, MusicListenAggregationValue> { constructor( protected eventStore: EventStore<MusicListenEvent>, protected databaseName: string, protected databaseVersion: number = 1, protected batchSize: number = 100, protected processingInterval: number = 1000, ) { ...snip... } protected applyMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void { if (oldVersion < 1) { db.createObjectStore(TotalStoreName, {keyPath: 'aggregationKey'}); } } protected async processEvent(trx: IDBTransaction, event: StoredEvent<MusicListenEvent>): Promise<MusicListenAggregationValue> { return new Promise((resolve, reject) => { const store = trx.objectStore(TotalStoreName); const aggregationKey = event.value.itemId; const getReq = store.get(aggregationKey); getReq.onerror = (error) => reject(getReq.error); getReq.onsuccess = () => { const data = getReq.result as MusicListenAggregationValue | undefined; const total = (data?.total ?? 0) + 1; const updated: MusicListenAggregationValue = { aggregationKey: aggregationKey, total }; const putReq = store.put(updated); putReq.onerror = () => reject(putReq.error); putReq.onsuccess = () => resolve(updated); }; }); } async getAggregatedTotal(itemIds: string[]): Promise<{ [key: string]: number }> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([TotalStoreName], 'readonly'); const store = trx.objectStore(TotalStoreName); const getItemData = (itemId: string): Promise<[string, number]> => new Promise((resolveItem, rejectItem) => { const request = store.get(itemId); request.onerror = () => rejectItem(new Error(`Error fetching data for item ${itemId}: ${request.error}`)); request.onsuccess = () => { const data = request.result as MusicListenAggregationValue; resolveItem([itemId, data ? data.total : 0]); }; }); Promise.all(itemIds.map(getItemData)) .then(entries => entries.reduce((acc, [key, value]) => ({ ...acc, [key]: value }), {}) ) .then(resolve) .catch(reject); trx.onerror = () => { reject(new Error(`Transaction error: ${trx.error}`)); }; }); } }
If changes to the aggregation logic or additions to the aggregation targets become necessary after release, a new Aggregator can be created.
For example, consider creating a MusicListenEventAggregatorV2 that newly adds "time of first play" as an aggregation target.
In this case, if you specify an instance of MusicListenEventStore to the constructor of MusicListenEventAggregatorV2 in the same way as MusicListenEventAggregator, MusicListenEventAggregatorV2 can also aggregate all events accumulated in MusicListenEventStore.
useListenEvent Hook
By using a React custom hook like the following, it becomes possible to easily add events and retrieve aggregation results from React components using the above classes.
'use client'; import React, {createContext, useContext, useState, useEffect, ReactNode, useRef, useMemo} from 'react'; import {MusicListenAggregationValue, MusicListenEvent, MusicListenEventAggregator} from './listenEvent'; import {EventStore} from "../../eventStore"; type ListenEventContextType = { addEvent: (event: MusicListenEvent) => Promise<void>; totals: { [key: string]: number }; isInitializing: boolean; isSyncing: boolean; error: Error | null; }; export const ListenEventContext = createContext<ListenEventContextType | undefined>(undefined); export type ListenEventContextProps = { keys: string[]; children: ReactNode; }; export const ListenEventProvider: React.FC<ListenEventContextProps> = ({ keys, children }) => { const [totals, setTotals] = useState<{ [key: string]: number }>({}); const [isInitializing, setIsInitializing] = useState(true); const [isSyncing, setIsSyncing] = useState(false); const [error, setError] = useState<Error | null>(null); const { eventStore, aggregator } = useMemo(() => { const eventStore = new EventStore<MusicListenEvent>('MusicListenEvents'); const aggregator = new MusicListenEventAggregator(eventStore, 'MusicListenAggregator_V2'); (async() => { await eventStore.initialize(); await aggregator.initialize(); aggregator.startProcessing(); })().then(() => { setIsInitializing(false); setIsSyncing(true); }).catch((err) => { setError(new Error(`Failed to initialize EventStore/Aggregator: ${err}`)); setIsInitializing(false); setIsSyncing(false); }); return { eventStore, aggregator }; }, []); useEffect(() => { const handleUpdate = (updated: MusicListenAggregationValue) => { setTotals(prevTotals => ({ ...prevTotals, [updated.aggregationKey]: updated.total })); }; const unsubscribe = aggregator.subscribe(handleUpdate); return () => { if (typeof unsubscribe === 'function') { unsubscribe(); } }; }, [eventStore, aggregator]); useEffect(() => { if (isInitializing || error != null) return; const fetchTotals = async () => { try { const result = await aggregator.getAggregatedTotal(keys); if (result != null) { setTotals(result); } } catch (err) { setError(err instanceof Error ? err : new Error('Failed to fetch totals')); } setIsSyncing(false); }; fetchTotals(); }, [keys, isInitializing]); return ( <ListenEventContext.Provider value={{ addEvent: eventStore.add.bind(eventStore), totals, isInitializing, isSyncing, error }}> {children} </ListenEventContext.Provider> ); }; export const useListenEvent = () => { const context = useContext(ListenEventContext); if (context === undefined) { throw new Error('useListenTotal must be used within a ListenEventContext'); } return context; };
The usage is as shown at the beginning:
Recording an event
const { addEvent } = useListenEvent(); addEvent({ itemId });
Using aggregated data
const { totals } = useListenEvent(); const total = totals[itemId];
However, since React.Context is used, the above processes need to be written within the following ListenEventProvider:
<ListenEventProvider keys={listenKeys}> // Need to execute within this </ListenEventProvider>
6. Challenges
For the implementation based on this proposal to effectively integrate with the server side, there are the following challenges:
- Implementing a mechanism to upload events recorded on the client to the server
- Efficient processing and aggregation of event data on the server
- Ensuring consistency between client-side and server-side aggregation results
7. Conclusion
This article proposed a method for implementing event sourcing and CQRS on the client side.
This proposal makes it possible to balance the reduction of database server maintenance costs in the initial stages with ensuring future extensibility.
We hope this proposal will serve as an aid for efficient data management methods for mobile application and SPA developers.