Skip to content

Redis

Example showing how to sync into Redis from Electric.

Syncing into a Redis data structure

Redis is often used as a cache. Electric can sync into Redis and automatically manage cache invalidation.

The main example code is in ./src/index.ts:

typescript
import { createClient } from 'redis'
import { ShapeStream, Message, isChangeMessage } from '@electric-sql/client'

// Create a Redis client
const REDIS_HOST = `localhost`
const REDIS_PORT = 6379
const client = createClient({
  url: `redis://${REDIS_HOST}:${REDIS_PORT}`,
})

client.connect().then(async () => {
  console.log(`Connected to Redis server`)

  // Clear out old data on the hash.
  client.del(`items`)

  // Lua script for updating hash field. We need to merge in partial updates
  // from the shape log.
  const script = `
      local current = redis.call('HGET', KEYS[1], KEYS[2])
      local parsed = {}
      if current then
        parsed = cjson.decode(current)
      end
      for k, v in pairs(cjson.decode(ARGV[1])) do
        parsed[k] = v
      end
      local updated = cjson.encode(parsed)
      return redis.call('HSET', KEYS[1], KEYS[2], updated)
    `

  // Load the script into Redis and get its SHA1 digest
  const updateKeyScriptSha1 = await client.SCRIPT_LOAD(script)

  const itemsStream = new ShapeStream({
    url: `http://localhost:3000/v1/shape`,
    params: {
      table: `items`,
    },
  })
  itemsStream.subscribe(async (messages: Message[]) => {
    // Begin a Redis transaction
    //
    // FIXME The Redis docs suggest only sending 10k commands at a time
    // to avoid excess memory usage buffering commands.
    const pipeline = client.multi()

    // Loop through each message and make writes to the Redis hash for action messages
    messages.forEach((message) => {
      if (!isChangeMessage(message)) return
      console.log(`message`, message)
      // Upsert/delete
      switch (message.headers.operation) {
        case `delete`:
          pipeline.hDel(`items`, message.key)
          break

        case `insert`:
          pipeline.hSet(
            `items`,
            String(message.key),
            JSON.stringify(message.value)
          )
          break

        case `update`: {
          pipeline.evalSha(updateKeyScriptSha1, {
            keys: [`items`, String(message.key)],
            arguments: [JSON.stringify(message.value)],
          })
          break
        }
      }
    })

    // Execute all commands as a single transaction
    try {
      await pipeline.exec()
      console.log(`Redis hash updated successfully with latest shape updates`)
    } catch (error) {
      console.error(`Error while updating hash:`, error)
    }
  })
})