Odyssey

Realtime data across your graph with federated subscriptions

1. Intro & setup2. Introducing subscriptions3. Codealong - Add Subscription and Mutation types4. Subscriptions over HTTP Callbacks5. Codealong - Configure HTTP Callback6. Codealong - PubSub, events, mutations7. Exercise - Add the subscription resolver8. Codealong - Requiring external fields9. Exercise - Define the isOnline resolver10. Bonus! Replaying events
10. Bonus! Replaying events
10m

Overview

What happens if we miss some messages? Sure, we could launch a new subscription after pre-populating our chat window with the older, unseen messages, but why not get it all in one?

We can do this by adding a cursor to our subscription field. We'll use this cursor to pass a timestamp for the last message received.

schema.graphql
type Subscription {
# Start a new subscription for messages in a particular conversation, with an optional timestamp cursor to replay from
listenForMessageInConversation(
id: ID!
fromMessageReceivedAt: String
): Message
}

To make this work, we'll first need to publish our changes.

messages
rover subgraph publish APOLLO_GRAPH_REF \
--name messages
--schema ./src/schema.graphql

Try swapping out our listenForMessageInConversation for the following. This code makes use of asynchronous generators to replay all the messages that were missed after a certain point.

import { Resolvers } from "../__generated__/resolvers-types";
import { NewMessageEvent } from "../datasources/models";
export const Subscription: Resolvers = {
Subscription: {
listenForMessageInConversation: {
// @ts-ignore
subscribe: async (
_,
{ fromMessageReceivedAt, id },
{ pubsub, dataSources }
) => {
// GOAL: If a cursor `fromMessageReceivedAt` is passed, fetch all messages sent after
// Check whether a timestamp was passed
const timestampMs = new Date(fromMessageReceivedAt).getTime();
// Validate that timestamp is a number, if so retrieve messages sent after that timestamp
if (!isNaN(timestampMs) && timestampMs > 0) {
const messages = await dataSources.messagesAPI.getMessagesAfterDate(
timestampMs,
id
);
return {
// Set up the generator
async *[Symbol.asyncIterator]() {
console.log(
"STEP 1: I am called the first time the subscription runs!"
);
// Initially, iterate through all the messages to "play back" what was missed
// We're not awaiting NEW messages, just yielding the messages we already have in DB
for (let i = 0; i < messages.length; i++) {
yield { listenForMessageInConversation: messages[i] };
}
console.log(
"STEP 2 TO INFINITY: creating a new iterator for each event"
);
// The thing we want to do with every new message
let iterator = {
[Symbol.asyncIterator]: () =>
pubsub.asyncIterator<NewMessageEvent>(["NEW_MESSAGE_SENT"]),
};
// The loop that awaits new message events and yields them
for await (const event of iterator) {
if (event.conversationId == id) {
yield event;
}
}
},
};
// If no timestamp is passed, handle new messages as we normally would
}
return pubsub.asyncIterator(["NEW_MESSAGE_SENT"]);
},
},
},
};
import { Resolvers } from "../__generated__/resolvers-types";
import { NewMessageEvent } from "../datasources/models";
export const Subscription: Resolvers = {
Subscription: {
listenForMessageInConversation: {
// @ts-ignore
subscribe: async (
_,
{ fromMessageReceivedAt, id },
{ pubsub, dataSources }
) => {
// GOAL: If a cursor `fromMessageReceivedAt` is passed, fetch all messages sent after
// Check whether a timestamp was passed
const timestampMs = parseInt(fromMessageReceivedAt);
// Validate that timestamp is a number, if so retrieve messages sent after that timestamp
if (!isNaN(timestampMs) && timestampMs > 0) {
const messages = await dataSources.db.getMessagesAfterDate(
timestampMs,
id
);
return {
// Set up the generator
async *[Symbol.asyncIterator]() {
console.log(
"STEP 1: I am called the first time the subscription runs!"
);
// Initially, iterate through all the messages to "play back" what was missed
// We're not awaiting NEW messages, just yielding the messages we already have in DB
for (let i = 0; i < messages.length; i++) {
yield { listenForMessageInConversation: messages[i] };
}
console.log(
"STEP 2 TO INFINITY: creating a new iterator for each event"
);
// The thing we want to do with every new message
let iterator = {
[Symbol.asyncIterator]: () =>
pubsub.asyncIterator<NewMessageEvent>(["NEW_MESSAGE_SENT"]),
};
// The loop that awaits new message events and yields them
for await (const event of iterator) {
if (event.conversationId == id) {
yield event;
}
}
},
};
// If no timestamp is passed, handle new messages as we normally would
}
return pubsub.asyncIterator(["NEW_MESSAGE_SENT"]);
},
},
},
};

Running an operation

Let's jump back into Studio. We've sent a few messages now, so let's grab one of the older timestamps.

Run the following operation to access all of our past messages in the "xeno-ripley-chat" conversation.

query GetConversationWithRecipient($recipientId: ID!) {
conversation(recipientId: $recipientId) {
id
createdAt
messages {
id
text
sentTime
}
}
}

And in the Variables panel:

{
"recipientId": "ripley"
}

Make sure your Headers contain the following:

Authorization: Bearer xeno

We should see some output in the Response panel. Let's grab the sentTime value for that first message, and set up our subscription.

subscription SubscribeToMessages(
$listenForMessageInConversationId: ID!
$fromMessageReceivedAt: String
) {
listenForMessageInConversation(
id: $listenForMessageInConversationId
fromMessageReceivedAt: $fromMessageReceivedAt
) {
id
text
}
}

In the Variables panel, make sure you include both variables.

{
"listenForMessageInConversationId": "xeno-ripley-chat",
"fromMessageReceivedAt": "YOUR-TIMESTAMP-HERE"
}

Run the subscription and... we'll see immediate replay! Then we can proceed with sending messages to the conversation as normal.

Previous

Share your questions and comments about this lesson

Your feedback helps us improve! If you're stuck or confused, let us know and we'll help you out. All comments are public and must follow the Apollo Code of Conduct. Note that comments that have been resolved or addressed may be removed.

You'll need a GitHub account to post below. Don't have one? Post in our Odyssey forum instead.

              launch

              The process of applying a set of updates to a supergraph. Launches are usually triggered by making changes to one of your published subgraph schemas.

              subscription

              A long-lived, real-time GraphQL operation that enables real-time communication by allowing clients to receive data updates from the server when specific events or changes occur.

              cursor

              A unique identifier marking a specific position in a paginated list. By keeping track of the last retrieved element, cursors ensure consistent data retrieval even in dynamically changing datasets.

              subscription

              A long-lived, real-time GraphQL operation that enables real-time communication by allowing clients to receive data updates from the server when specific events or changes occur.

              field

              A unit of data that belongs to a type in a schema. Every GraphQL query requests one or more fields.

              type Author {
              # id, firstName, and lastName are all fields of the Author type
              id: Int!
              firstName: String
              lastName: String
              }
              operation

              A single query, mutation, or subscription that clients send to a GraphQL server to request or manipulate data.

              subscription

              A long-lived, real-time GraphQL operation that enables real-time communication by allowing clients to receive data updates from the server when specific events or changes occur.

              variables

              A placeholder for dynamic values in an operation allowing parameterization and reusability in requests. Variables can be used to fill arguments or passed to directives.

              query GetUser($userId: ID!) {
              user(id: $userId) {
              firstName
              }
              }

              In the query above, userId is a variable. The variable and its type are declared in the operation signature, signified by a $. The type of variable is a non-nullable ID. A variable's type must match the type of any argument it's used for.

              subscription

              A long-lived, real-time GraphQL operation that enables real-time communication by allowing clients to receive data updates from the server when specific events or changes occur.

              NEW COURSE ALERT

              Introducing Apollo Connectors

              Connectors are the new and easy way to get started with GraphQL, using existing REST APIs.

              Say goodbye to GraphQL servers and resolvers—now, everything happens in the schema!

              Take the course