---
Implements CQRS pattern for Electron services using RxJS Observables for queries and neverthrow ResultAsync for commands. Use when creating reactive data services that need read/write separation with automatic state synchronization.
/plugin marketplace add naporin0624/claude-plugin-hono-electron/plugin install hono-electron-ipc@hono-electron-marketplaceThis skill inherits all available tools. When active, it can use any tool Claude has access to.
OBSERVABLE-PATTERN.mdRESULT-TYPES.mdexamples/event-service.tsThis skill documents the CQRS (Command Query Responsibility Segregation) pattern used in Electron applications with RxJS and neverthrow.
| Aspect | Query (Read) | Command (Write) |
|---|---|---|
| Purpose | Read data without side effects | Modify state |
| Return Type | Observable<T> | ResultAsync<void, Error> |
| Side Effects | None | Database writes, notifications |
| Idempotency | Always idempotent | May not be idempotent |
// src/shared/services/user.service.d.ts
import type { Observable } from 'rxjs';
import type { ResultAsync } from 'neverthrow';
import type { ApplicationError } from '@shared/typing';
interface UserService {
// Queries - Return Observable
list(): Observable<User[]>;
get(id: string): Observable<User | undefined>;
// Commands - Return ResultAsync
create(data: CreateUserData): ResultAsync<void, ApplicationError>;
update(data: UpdateUserData): ResultAsync<void, ApplicationError>;
delete(id: string): ResultAsync<void, ApplicationError>;
}
// src/main/services/user.service.ts
import { BehaviorSubject, Observable, concat, from } from 'rxjs';
import { distinctUntilChanged, mergeMap } from 'rxjs/operators';
import { ResultAsync, okAsync, errAsync } from 'neverthrow';
export class UserServiceImpl implements UserService {
#notify = new BehaviorSubject(Date.now());
constructor(private readonly db: Database) {}
// QUERY: Returns Observable
list(): Observable<User[]> {
return concat(
from(this.#getUsers()), // Initial fetch
this.#notify.pipe(
distinctUntilChanged(),
mergeMap(() => this.#getUsers()) // Re-fetch on changes
)
);
}
// COMMAND: Returns ResultAsync
create(data: CreateUserData): ResultAsync<void, ApplicationError> {
return this.#insertUser(data)
.andThen(() => {
this.#notify.next(Date.now()); // Notify subscribers
return okAsync(void 0);
});
}
// Private: Actual database operation
async #getUsers(): Promise<User[]> {
return this.db.query.users.findMany();
}
#insertUser(data: CreateUserData): ResultAsync<void, ApplicationError> {
return ResultAsync.fromPromise(
this.db.insert(users).values(data),
(e) => new ApplicationError('Failed to create user', e)
).map(() => void 0);
}
}
┌─────────────────────────────────────────────────────────────┐
│ SERVICE LAYER (CQRS) │
├─────────────────────────────────────────────────────────────┤
│ │
│ #notify = new BehaviorSubject(Date.now()) │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ │ │
│ ▼ ▼ │
│ QUERY COMMAND │
│ Observable<T> ResultAsync<void> │
│ │ │ │
│ │ concat( │ #performOperation() │
│ │ from(#getData()), │ .andThen(() => { │
│ │ #notify.pipe( │ #notify.next(Date.now()); │
│ │ mergeMap(...) │ return okAsync(void 0); │
│ │ ) │ }) │
│ │ ) │ │
│ │ │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ DATABASE │
└─────────────────────────────────────────────────────────────┘
list(), histories()get(id), active()notify() (returns Subject/Observable)create(data)update(data)delete(id)invite(), send()