TypeScript client
The TypeScript client is a higher-level client interface that wraps the HTTP API to make it easy to sync Shapes in the web browser and other JavaScript environments.
Defined in packages/typescript-client, it provides a ShapeStream primitive to subscribe to a change stream and a Shape primitive to get the whole shape whenever it changes.
Install
The client is published on NPM as @electric-sql/client
:
npm i @electric-sql/client
How to use
The client exports:
- a
ShapeStream
class for consuming a Shape Log; and - a
Shape
class for materialising the log stream into a shape object
These compose together, e.g.:
import { ShapeStream } from '@electric-sql/client'
const stream = new ShapeStream({
url: `http://localhost:3000/v1/shape`,
params: {
table: 'items'
}
})
const shape = new Shape(stream)
// The callback runs every time the Shape data changes.
shape.subscribe(data => console.log(data))
ShapeStream
The ShapeStream
is a low-level primitive for consuming a Shape Log.
Construct with a shape definition and options and then either subscribe to the shape log messages directly or pass into a Shape
to materialise the stream into an object.
import { ShapeStream } from '@electric-sql/client'
// Passes subscribers rows as they're inserted, updated, or deleted
const stream = new ShapeStream({
url: `http://localhost:3000/v1/shape`,
params: {
table: `foo`
}
})
stream.subscribe(messages => {
// messages is an array with one or more row updates
// and the stream will wait for all subscribers to process them
// before proceeding
})
Options
The ShapeStream
constructor takes the following options:
/**
* Options for constructing a ShapeStream.
*/
export interface ShapeStreamOptions<T = never> {
/**
* The full URL to where the Shape is hosted. This can either be the Electric
* server directly or a proxy. E.g. for a local Electric instance, you might
* set `http://localhost:3000/v1/shape`
*/
url: string
/**
* Which database to use.
* This is optional unless Electric is used with multiple databases.
*/
databaseId?: string
/**
* PostgreSQL-specific parameters for the shape.
* This includes table, where clause, columns, and replica settings.
*/
params: {
/**
* The root table for the shape.
*/
table: string
/**
* The where clauses for the shape.
*/
where?: string
/**
* The columns to include in the shape.
* Must include primary keys, and can only include valid columns.
*/
columns?: string[]
/**
* If `replica` is `default` (the default) then Electric will only send the
* changed columns in an update.
*
* If it's `full` Electric will send the entire row with both changed and
* unchanged values.
*
* Setting `replica` to `full` will obviously result in higher bandwidth
* usage and so is not recommended.
*/
replica?: Replica
/**
* Additional request parameters to attach to the URL.
* These will be merged with Electric's standard parameters.
*/
[key: string]: string | string[] | undefined
}
/**
* The "offset" on the shape log. This is typically not set as the ShapeStream
* will handle this automatically. A common scenario where you might pass an offset
* is if you're maintaining a local cache of the log. If you've gone offline
* and are re-starting a ShapeStream to catch-up to the latest state of the Shape,
* you'd pass in the last offset and shapeId you'd seen from the Electric server
* so it knows at what point in the shape to catch you up from.
*/
offset?: Offset
/**
* Similar to `offset`, this isn't typically used unless you're maintaining
* a cache of the shape log.
*/
shapeId?: string
/**
* HTTP headers to attach to requests made by the client.
* Can be used for adding authentication headers.
*/
headers?: Record<string, string>
/**
* Automatically fetch updates to the Shape. If you just want to sync the current
* shape and stop, pass false.
*/
subscribe?: boolean
/**
* Signal to abort the stream.
*/
signal?: AbortSignal
/**
* Custom fetch client implementation.
*/
fetchClient?: typeof fetch
/**
* Custom parser for handling specific Postgres data types.
*/
parser?: Parser<T>
/**
* A function for handling errors.
* This is optional, when it is not provided any shapestream errors will be thrown.
* If the function returns an object containing parameters and/or headers
* the shapestream will apply those changes and try syncing again.
* If the function returns void the shapestream is stopped.
*/
onError?: ShapeStreamErrorHandler
backoffOptions?: BackoffOptions
}
type RetryOpts = {
params?: ParamsRecord
headers?: Record<string, string>
}
type ShapeStreamErrorHandler = (
error: Error
) => void | RetryOpts | Promise<void | RetryOpts>
Note that certain parameter names are reserved for Electric's internal use and cannot be used in custom params:
offset
handle
live
cursor
database_id
The following PostgreSQL-specific parameters should be included within the params
object:
table
- The root table for the shapewhere
- SQL where clause for filtering rowscolumns
- List of columns to includereplica
- Controls whether to send full or partial row updates
Example with PostgreSQL-specific parameters:
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'users',
where: 'age > 18',
columns: ['id', 'name', 'email'],
replica: 'full'
}
})
You can also include additional custom parameters in the params
object alongside the PostgreSQL-specific ones:
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'users',
customParam: 'value'
}
})
Dynamic Options
Both params
and headers
support function options that are resolved when needed. These functions can be synchronous or asynchronous:
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'items',
userId: () => getCurrentUserId(),
filter: async () => await getUserPreferences()
},
headers: {
'Authorization': async () => `Bearer ${await getAccessToken()}`,
'X-Tenant-Id': () => getCurrentTenant()
}
})
Function options are resolved in parallel, making this pattern efficient for multiple async operations like fetching auth tokens and user context. Common use cases include:
- Authentication tokens that need to be refreshed
- User-specific parameters that may change
- Dynamic filtering based on current state
- Multi-tenant applications where context determines the request
Messages
A ShapeStream
consumes and emits a stream of messages. These messages can either be a ChangeMessage
representing a change to the shape data:
export type ChangeMessage<T extends Row<unknown> = Row> = {
key: string
value: T
headers: Header & { operation: `insert` | `update` | `delete` }
offset: Offset
}
Or a ControlMessage
, representing an instruction to the client, as documented here.
Parsing
By default, when constructing a ChangeMessage.value
, ShapeStream
parses the following Postgres types into native JavaScript values:
int2
,int4
,float4
, andfloat8
are parsed into JavaScriptNumber
int8
is parsed into a JavaScriptBigInt
bool
is parsed into a JavaScriptBoolean
json
andjsonb
are parsed into JavaScript values/arrays/objects usingJSON.parse
- Postgres Arrays are parsed into JavaScript arrays, e.g.
"{{1,2},{3,4}}"
is parsed into[[1,2],[3,4]]
All other types aren't parsed and are left in the string format as they were served by the HTTP endpoint.
Custom parsing
You can extend this behaviour by configuring a custom parser. This is an object mapping Postgres types to parsing functions for those types. For example, we can extend the default parser to parse booleans into 1
or 0
instead of true
or false
:
const stream = new ShapeStream({
url: `http://localhost:3000/v1/shape`,
params: {
table: `foo`
},
parser: {
bool: (value: string) => value === `true` ? 1 : 0
}
})
Replica full
By default Electric sends the modified columns in an update message, not the complete row. To be specific:
- an
insert
operation contains the full row - an
update
operation contains the primary key column(s) and the changed columns - a
delete
operation contains just the primary key column(s)
If you'd like to recieve the full row value for updates and deletes, you can set the replica
option of your ShapeStream
to full
:
import { ShapeStream } from "@electric-sql/client"
const stream = new ShapeStream({
url: `http://localhost:3000/v1/shape`,
params: {
table: `foo`,
replica: `full`
}
})
This is less efficient and will use more bandwidth for the same shape (especially for tables with large static column values). Note also that shapes with different replica
settings are distinct, even for the same table and where clause combination.
Authentication with Dynamic Tokens
When working with authentication tokens that need to be refreshed, the recommended approach is to use a function-based header:
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'items'
},
headers: {
'Authorization': async () => `Bearer ${await getToken()}`
},
onError: async (error) => {
if (error instanceof FetchError && error.status === 401) {
// Force token refresh
await refreshToken()
// Return empty object to trigger a retry with the new token
// that will be fetched by our function-based header
return {}
}
// Rethrow errors we can't handle
throw error
}
})
This approach automatically handles token refresh as the function is called each time a request is made. You can also combine this with an error handler for more complex scenarios.
Shape
The Shape
is the main primitive for working with synced data.
It takes a ShapeStream
, consumes the stream, materialises it into a Shape object and notifies you when this changes.
import { ShapeStream, Shape } from '@electric-sql/client'
const stream = new ShapeStream({
url: `http://localhost:3000/v1/shape`,
params: {
table: `foo`
}
})
const shape = new Shape(stream)
// Returns promise that resolves with the latest shape data once it's fully loaded
await shape.rows
// passes subscribers shape data when the shape updates
shape.subscribe(({ rows }) => {
// rows is an array of the latest value of each row in a shape.
})
Subscribing to updates
The subscribe
method allows you to receive updates whenever the shape changes. It takes two arguments:
- A message handler callback (required)
- An error handler callback (optional)
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'issues'
}
})
// Subscribe to both message and error handlers
stream.subscribe(
(messages) => {
// Process messages
console.log('Received messages:', messages)
},
(error) => {
// Get notified about errors
console.error('Error in subscription:', error)
}
)
You can have multiple active subscriptions to the same stream. Each subscription will receive the same messages, and the stream will wait for all subscribers to process their messages before proceeding.
To stop receiving updates, you can either:
- Unsubscribe a specific subscription using the function returned by
subscribe
- Unsubscribe all subscriptions using
unsubscribeAll()
// Store the unsubscribe function
const unsubscribe = stream.subscribe(messages => {
console.log('Received messages:', messages)
})
// Later, unsubscribe this specific subscription
unsubscribe()
// Or unsubscribe all subscriptions
stream.unsubscribeAll()
Error Handling
The ShapeStream provides two ways to handle errors:
- Using the
onError
handler (recommended):
const stream = new ShapeStream({
url: 'http://localhost:3000/v1/shape',
params: {
table: 'issues'
},
onError: (error) => {
// Handle all stream errors here
if (error instanceof FetchError) {
console.error('HTTP error:', error.status, error.message)
} else {
console.error('Stream error:', error)
}
}
})
If no onError
handler is provided, the ShapeStream will throw errors that occur during streaming.
- Individual subscribers can optionally handle errors specific to their subscription:
stream.subscribe(
(messages) => {
// Process messages
},
(error) => {
// Handle errors for this specific subscription
console.error('Subscription error:', error)
}
)
Error Types
The following error types may be encountered:
Initialization Errors (thrown by constructor):
MissingShapeUrlError
: Missing required URL parameterInvalidSignalError
: Invalid AbortSignal instanceReservedParamError
: Using reserved parameter names
Runtime Errors (handled by onError
or thrown):
FetchError
: HTTP errors during shape fetchingFetchBackoffAbortError
: Fetch aborted using AbortSignalMissingShapeHandleError
: Missing required shape handleParserNullValueError
: Parser encountered NULL value in a column that doesn't allow NULL values
See the Demos and integrations for more usage examples.