Yjs
This is an example application using Electric with Yjs.
Electric Yjs provider
An example text editor showing how to use Yjs and Electric together. It uses the Y-Electric provider to sync Yjs document and awareness changes to all connected clients. Checkout the package to learn how to integrate Yjs with your existing app:
typescript
import * as encoding from 'lib0/encoding'
import * as decoding from 'lib0/decoding'
import * as awarenessProtocol from 'y-protocols/awareness'
import { ObservableV2 } from 'lib0/observable'
import * as env from 'lib0/environment'
import * as Y from 'yjs'
import {
GetExtensions,
isChangeMessage,
isControlMessage,
Message,
Offset,
Row,
ShapeStream,
ShapeStreamOptions,
} from '@electric-sql/client'
import {
YProvider,
ResumeState,
SendErrorRetryHandler,
ElectricProviderOptions,
} from './types'
type AwarenessUpdate = {
added: number[]
updated: number[]
removed: number[]
}
export class ElectricProvider<
RowWithDocumentUpdate extends Row<decoding.Decoder> = never,
RowWithAwarenessUpdate extends Row<decoding.Decoder> = never,
> extends ObservableV2<YProvider> {
private doc: Y.Doc
private documentUpdates: {
shape: ShapeStreamOptions<GetExtensions<RowWithDocumentUpdate>>
sendUrl: string | URL
getUpdateFromRow: (row: RowWithDocumentUpdate) => decoding.Decoder
sendErrorRetryHandler?: SendErrorRetryHandler
}
private awarenessUpdates?: {
shape: ShapeStreamOptions<GetExtensions<RowWithAwarenessUpdate>>
sendUrl: string | URL
protocol: awarenessProtocol.Awareness
getUpdateFromRow: (row: RowWithAwarenessUpdate) => decoding.Decoder
sendErrorRetryHandler?: SendErrorRetryHandler
}
private _connected: boolean = false
private _synced: boolean = false
private resumeState: ResumeState
private sendingPendingChanges: boolean = false
private pendingChanges: Uint8Array | null = null
private sendingAwarenessState: boolean = false
private pendingAwarenessUpdate: AwarenessUpdate | null = null
private documentUpdateHandler: (
update: Uint8Array,
origin: unknown,
doc: Y.Doc,
transaction: Y.Transaction
) => void
private awarenessUpdateHandler?: (
update: AwarenessUpdate,
origin: unknown
) => void
private exitHandler: () => void
private unsubscribeShapes?: () => void
private fetchClient?: typeof fetch
/**
* Creates a new ElectricProvider instance that connects YJS documents to Electric SQL.
*
* @constructor
* @param {ElectricProviderOptions} options - Configuration options for the provider
* @param {Y.Doc} options.doc - The YJS document to be synchronized
* @param {Object} options.documentUpdates - Document updates configuration
* @param {ShapeStreamOptions} options.documentUpdates.shape - Options for the document updates shape stream
* @param {string|URL} options.documentUpdates.sendUrl - URL endpoint for sending document updates
* @param {Function} options.documentUpdates.getUpdateFromRow - Function to extract document update from row
* @param {SendErrorRetryHandler} [options.documentUpdates.sendErrorRetryHandler] - Error handler for retrying document updates
* @param {Object} [options.awarenessUpdates] - Awareness updates configuration (optional)
* @param {ShapeStreamOptions} options.awarenessUpdates.shape - Options for the awareness updates shape stream
* @param {string|URL} options.awarenessUpdates.sendUrl - URL endpoint for sending awareness updates
* @param {awarenessProtocol.Awareness} options.awarenessUpdates.protocol - Awareness protocol instance
* @param {Function} options.awarenessUpdates.getUpdateFromRow - Function to extract awareness update from row
* @param {SendErrorRetryHandler} [options.awarenessUpdates.sendErrorRetryHandler] - Error handler for retrying awareness updates
* @param {ResumeState} [options.resumeState] - Resume state for the provider
* @param {boolean} [options.connect=true] - Whether to automatically connect upon initialization
* @param {typeof fetch} [options.fetchClient] - Custom fetch implementation to use for HTTP requests
*/
constructor({
doc,
documentUpdates: documentUpdatesConfig,
awarenessUpdates: awarenessUpdatesConfig,
resumeState,
connect = true,
fetchClient,
}: ElectricProviderOptions<RowWithDocumentUpdate, RowWithAwarenessUpdate>) {
super()
this.doc = doc
this.documentUpdates = documentUpdatesConfig
this.awarenessUpdates = awarenessUpdatesConfig
this.resumeState = resumeState ?? {}
this.fetchClient = fetchClient
this.exitHandler = () => {
if (env.isNode && typeof process !== `undefined`) {
process.on(`exit`, this.destroy.bind(this))
}
}
this.documentUpdateHandler = this.doc.on(
`update`,
this.applyDocumentUpdate.bind(this)
)
if (this.awarenessUpdates) {
this.awarenessUpdateHandler = this.applyAwarenessUpdate.bind(this)
this.awarenessUpdates.protocol.on(`update`, this.awarenessUpdateHandler!)
}
// enqueue unsynced changes from document if the
// resume state provides the document state vector
if (this.resumeState?.stableStateVector) {
this.pendingChanges = Y.encodeStateAsUpdate(
this.doc,
this.resumeState.stableStateVector
)
}
if (connect) {
this.connect()
}
}
get synced() {
return this._synced
}
set synced(state) {
if (this._synced !== state) {
this._synced = state
this.emit(`synced`, [state])
this.emit(`sync`, [state])
}
}
set connected(state) {
if (this._connected !== state) {
this._connected = state
if (state) {
this.sendOperations()
}
this.emit(`status`, [{ status: state ? `connected` : `disconnected` }])
}
}
get connected() {
return this._connected
}
private batch(update: Uint8Array) {
if (this.pendingChanges) {
this.pendingChanges = Y.mergeUpdates([this.pendingChanges, update])
} else {
this.pendingChanges = update
}
}
destroy() {
this.disconnect()
this.doc.off(`update`, this.documentUpdateHandler)
this.awarenessUpdates?.protocol.off(`update`, this.awarenessUpdateHandler!)
if (env.isNode && typeof process !== `undefined`) {
process.off(`exit`, this.exitHandler!)
}
super.destroy()
}
disconnect() {
this.unsubscribeShapes?.()
if (!this.connected) {
return
}
if (this.awarenessUpdates) {
awarenessProtocol.removeAwarenessStates(
this.awarenessUpdates.protocol,
Array.from(this.awarenessUpdates.protocol.getStates().keys()).filter(
(client) => client !== this.awarenessUpdates!.protocol.clientID
),
this
)
// try to notifying other clients that we are disconnecting
awarenessProtocol.removeAwarenessStates(
this.awarenessUpdates.protocol,
[this.awarenessUpdates.protocol.clientID],
`local`
)
this.awarenessUpdates.protocol.setLocalState({})
}
// TODO: await for events before closing
this.emit(`connection-close`, [])
this.pendingAwarenessUpdate = null
this.connected = false
this.synced = false
}
connect() {
if (this.connected) {
return
}
const abortController = new AbortController()
const operationsStream = new ShapeStream<RowWithDocumentUpdate>({
...this.documentUpdates.shape,
...this.resumeState.document,
signal: abortController.signal,
})
const operationsShapeUnsubscribe = operationsStream.subscribe(
(messages) => {
this.operationsShapeHandler(
messages,
operationsStream.lastOffset,
operationsStream.shapeHandle!
)
}
)
let awarenessShapeUnsubscribe: () => void | undefined
if (this.awarenessUpdates) {
const awarenessStream = new ShapeStream<RowWithAwarenessUpdate>({
...this.awarenessUpdates.shape,
...this.resumeState.awareness,
signal: abortController.signal,
})
awarenessShapeUnsubscribe = awarenessStream.subscribe((messages) => {
this.awarenessShapeHandler(
messages,
awarenessStream.lastOffset,
awarenessStream.shapeHandle!
)
})
}
this.unsubscribeShapes = () => {
abortController.abort()
operationsShapeUnsubscribe()
awarenessShapeUnsubscribe?.()
this.unsubscribeShapes = undefined
}
this.emit(`status`, [{ status: `connecting` }])
}
private operationsShapeHandler(
messages: Message<RowWithDocumentUpdate>[],
offset: Offset,
handle: string
) {
for (const message of messages) {
if (isChangeMessage(message)) {
const decoder = this.documentUpdates.getUpdateFromRow(message.value)
while (decoder.pos !== decoder.arr.length) {
const operation = decoding.readVarUint8Array(decoder)
Y.applyUpdate(this.doc, operation, `server`)
}
} else if (
isControlMessage(message) &&
message.headers.control === `up-to-date`
) {
this.resumeState.document = {
offset,
handle,
}
if (!this.sendingPendingChanges) {
this.synced = true
this.resumeState.stableStateVector = Y.encodeStateVector(this.doc)
}
this.emit(`resumeState`, [this.resumeState])
this.connected = true
}
}
}
// TODO: add an optional throttler that batches updates
// before pushing to the server
private async applyDocumentUpdate(update: Uint8Array, origin: unknown) {
// don't re-send updates from electric
if (origin === `server`) {
return
}
this.batch(update)
this.sendOperations()
}
private async sendOperations() {
if (!this.connected || this.sendingPendingChanges) {
return
}
try {
this.sendingPendingChanges = true
while (
this.pendingChanges &&
this.pendingChanges.length > 2 &&
this.connected
) {
const sending = this.pendingChanges
this.pendingChanges = null
const encoder = encoding.createEncoder()
encoding.writeVarUint8Array(encoder, sending)
const success = await send(
encoder,
this.documentUpdates.sendUrl,
this.fetchClient ?? fetch,
this.documentUpdates.sendErrorRetryHandler
)
if (!success) {
this.batch(sending)
this.disconnect()
}
}
// no more pending changes, move stableStateVector forward
this.resumeState.stableStateVector = Y.encodeStateVector(this.doc)
this.emit(`resumeState`, [this.resumeState])
} finally {
this.sendingPendingChanges = false
}
}
private async applyAwarenessUpdate(
awarenessUpdate: AwarenessUpdate,
origin: unknown
) {
if (origin !== `local` || !this.connected) {
return
}
this.pendingAwarenessUpdate = awarenessUpdate
if (this.sendingAwarenessState) {
return
}
this.sendingAwarenessState = true
try {
while (this.pendingAwarenessUpdate && this.connected) {
const update = this.pendingAwarenessUpdate
this.pendingAwarenessUpdate = null
const { added, updated, removed } = update
const changedClients = added.concat(updated).concat(removed)
const encoder = encoding.createEncoder()
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
this.awarenessUpdates!.protocol,
changedClients
)
)
const success = await send(
encoder,
this.awarenessUpdates!.sendUrl,
this.fetchClient ?? fetch,
this.awarenessUpdates!.sendErrorRetryHandler
)
if (!success) {
this.disconnect()
}
}
} finally {
this.sendingAwarenessState = false
}
}
private awarenessShapeHandler(
messages: Message<RowWithAwarenessUpdate>[],
offset: Offset,
handle: string
) {
for (const message of messages) {
if (isChangeMessage(message)) {
if (message.headers.operation === `delete`) {
awarenessProtocol.removeAwarenessStates(
this.awarenessUpdates!.protocol,
[Number(message.value.client_id)],
`remote`
)
} else {
const decoder = this.awarenessUpdates!.getUpdateFromRow(message.value)
awarenessProtocol.applyAwarenessUpdate(
this.awarenessUpdates!.protocol,
decoding.readVarUint8Array(decoder),
this
)
}
} else if (
isControlMessage(message) &&
message.headers.control === `up-to-date`
) {
this.resumeState.awareness = {
offset: offset,
handle: handle,
}
this.emit(`resumeState`, [this.resumeState])
}
}
}
}
async function send(
encoder: encoding.Encoder,
endpoint: string | URL,
fetchClient: typeof fetch,
retryHandler?: SendErrorRetryHandler
): Promise<boolean> {
let response: Response | undefined
const op = encoding.toUint8Array(encoder)
try {
response = await fetchClient(endpoint!, {
method: `PUT`,
headers: {
'Content-Type': `application/octet-stream`,
},
body: op,
})
if (!response.ok) {
throw new Error(`Server did not return 2xx`)
}
return true
} catch (error) {
const shouldRetry = await (retryHandler?.({
response,
error,
}) ?? false)
return shouldRetry
}
}