createReactiveStoreFromDataPublisherFactory

function createReactiveStoreFromDataPublisherFactory<TData>(
    config,
): ReactiveStreamStore<TData>;

Returns a ReactiveStreamStore that wires itself to a fresh DataPublisher on every `connect()`.

The store accepts a createDataPublisher factory rather than a ready-made publisher — that lets the store tear down a broken stream and open a new one without losing subscribers or the last known value.

Things to note:

  • The returned store starts in status: 'idle'. Call connect() to open the first stream.
  • On error, the store transitions to status: 'error' preserving the last known value. Only the first error per connection window is captured — a subsequent connect() resets that window.
  • connect() aborts any currently active connection and invokes the factory again, transitioning through retrying (preserving stale data) when called from a non-idle state. If the factory rejects, the store transitions to status: 'error' with the rejection reason.
  • `reset()` aborts the current connection and returns to idle, clearing both data and error.
  • Triggering the caller's abortSignal disconnects the store permanently; subsequent connect() calls are no-ops.

Type Parameters

Type Parameter
TData

Parameters

ParameterTypeDescription
configFactoryConfig-

Returns

ReactiveStreamStore<TData>

Example

const store = createReactiveStoreFromDataPublisherFactory({
    abortSignal,
    async createDataPublisher() {
        return getDataPublisherFromEventEmitter(new WebSocket(url));
    },
    dataChannelName: 'message',
    errorChannelName: 'error',
});
const unsubscribe = store.subscribe(() => {
    const snapshot = store.getUnifiedState();
    if (snapshot.status === 'error') console.error('Connection failed:', snapshot.error);
    else if (snapshot.status === 'loaded') console.log('Latest:', snapshot.data);
});
store.connect();

On this page