May 13, 2021

Using Subscriptions with Your Federated Data Graph

Mandi Wise
@mandiwise
BackendFederationSubscriptions
Last updated May 27, 2021

Apollo Federation allows teams to take a declarative approach to build out distributed GraphQL architectures. What’s more, it allows teams to divide and manage portions of the data graph’s schema based on separation of concern, so they can work independently on different products and features that are powered by a single data graph.

This approach to data graph ownership can have a transformational effect on the way engineering teams work together to expose product-driven query and mutation operations in a GraphQL API, but out-of-the-box, Apollo Federation doesn’t support subscription operations.

But that doesn’t mean that we can’t use GraphQL subscriptions with a federated schema necessarily. In this post, I’ll demonstrate that with a bit of architectural creativity and by leveraging some of Apollo Gateway’s built-in features, we can design a decoupled, scalable solution that allows us to run subscription operations against a federated data graph.

But First, an Important Question

Before we explore how a federated data graph may be extended to support subscription operations, we have an important question to consider. And I would say it’s important to answer this question for federated and non-federated GraphQL APIs alike. The question is:

Are subscriptions truly the right way to support the real-time feature that you need to implement?

When thinking about querying data in real-time via a GraphQL API, subscription operations may seem like a default option because they are included directly in the GraphQL specification. There are many popular GraphQL servers and other libraries that also support subscriptions (usually via a WebSocket connection), so the barriers to getting up and running with them are relatively low. But alas, there’s no such thing as full-duplex free lunch!

For example, maintaining an open socket connection to receive updates from a subscription operation will have battery usage implications for mobile devices. So before adding the Subscription type to any GraphQL schema, it’s always worth asking if polling for less frequent updates or push notifications on mobile would be a more suitable solution.

However, if GraphQL subscriptions are the appropriate technical answer for the feature you need to implement, then at this point we can turn our attention to using what’s available in Apollo Gateway to support those subscription operations against a federated data graph. What’s more, we’ll do so in a way that will feel as seamless as it does with monolithic GraphQL API for the clients that consume the graph.

Treating Subscriptions as a Separate Concern

A key feature of Apollo Federation is that it was designed to support decoupled and distributed GraphQL architectures. However, the stateless nature of the HTTP transport layer we typically use to support query and mutation operations may need to scale in a different way than stateful WebSocket connections do.

In a standard approach to implementing subscriptions for a GraphQL API, we often end up coupling these two transport concerns together. In the spirit of decoupling, our federated data graph would ideally remain the stateless execution engine of the API. Alongside it, we could have a separate and dedicated subscription service that’s responsible for maintaining stateful connections with clients, as well as processing and responding to their requests.

When taking this approach, some practical considerations will help us produce a workable outcome for client and server developers. First, if the subscriptions are managed in a separate service, then we need some way to use the same types that are defined in the federated data graph as the output types for subscription fields. By doing so, client developers can write subscription operations in a way that reflects the natural relationships between types in your federated data graph

Second, if the type definitions from the federated data graph are included in the subscription service’s schema, then we’ll need a way to keep the schema in sync between them automatically.

Lastly, we’ll also need a way to resolve additional data requested by a client when that data isn’t provided in the message payload that was published by a subgraph service to a shared pub/sub implementation. For these requests, the subscription service will need to reach out to the federated data graph to query the missing data so that it may fully resolve all of the fields in the subscription operation’s selection set.

With these requirements in mind, we’re ready to start building a decoupled subscription service in the next section.

Subscription Service for a Live Blog

Let’s imagine a scenario where we need to build a live blog powered by Apollo Federation and this data graph will have an authors subgraph and a posts subgraph. We’ll need a single subscription operation to push new posts to subscribed clients immediately as those posts become available. In addition to the basic post data, we need to traverse the graph to provide some additional details about the post author in the subscription response.

These are the key architectural components in our application:

  1. An Apollo Gateway that composes the authors and posts subgraphs using managed federation
  2. A decoupled subscription service
  3. A shared instance of Redis used for pub/sub
  4. An Apollo Client-powered React app

To keep things concise, the code examples that follow are key highlights from the full solution. You can view an example of the working live blog in this repository.

The posts subgraph will publish POST_ADDED messages to Redis after a new post is created with a mutation in the posts subgraph. The subscription service will subscribe to the same POST_ADDED messages from Redis and it will make requests to the federated data graph to fetch additional data needed to fully resolve a postAdded subscription operation for a client.

We use Redis pub/sub for subscriptions instead of a basic in-memory pub/sub implementation because any of the subgraph services may need to publish messages and the subscription service will need to be able to receive those messages. These services won’t be running on the same server, so an in-memory approach won’t work (plus it’s a best practice to not rely on in-memory pub/sub when using subscriptions in production).

For our application, we’ll use the GraphQL Redis Subscriptions package to connect to Redis from the posts service:

// Post Service

import { RedisPubSub } from "graphql-redis-subscriptions";
import Redis from "ioredis";

export const redis = new Redis(6379, "127.0.0.1");

export const pubsub = new RedisPubSub({
  publisher: redis,
  subscriber: redis
});

In the posts subgraph schema below, note that we reference an Author entity that originates in an authors subgraph. We use the Author type as the return type for the author field on the Post type, and we reference an author based on the id key field. There’s also an addPost mutation and we’ll use its resolver to publish a message to Redis about a new post’s availability.

# Posts Service

type Post {
  id: ID!
  author: Author!
  content: String!
  publishedAt: String!
  title: String!
}

extend type Author @key(fields: "id") {
  id: ID! @external
}

extend type Query {
  posts: [Post]
}

extend type Mutation {
  addPost(authorID: ID!, content: String, title: String): Post
}

In the addPost resolver, we publish a POST_ADDED message with a payload containing some of the new post’s data, but we don’t have any information about the post author in the post object apart from their ID:

// Post Service

import { pubsub } from "./redis";

const resolvers = {
  Query: {
    // ...
  },
  Mutation: {
    addPost(root, args, context, info) {
      const post = newPost(args);
      pubsub.publish("POST_ADDED", { postAdded: post });
      return post;
    }
  }
};

That’s all we’ll need to do in the posts service, so we can turn our attention to the subscription service now. The type definitions that we define here manually will only contain the Subscription type and any fields we want to add to that type.

However, because we’ll combine the types from the federated data graph into this schema in a moment, we’ll be able to use the types defined in the federated data graph as output types for the Subscription fields even though they not defined here explicitly. So for the postAdded field, we use the Post type that’s owned by the posts subgraph as its return type:

# Subscription Service

type Subscription {
  postAdded: Post
}

This is where things start to get interesting. To get the schema for the federated data graph, we can instantiate an ApolloGateway object directly in the subscription service. Recall that we’re using managed federation, so there would be an APOLLO_KEY variable set in the environment and we don’t provide a serviceList option in its constructor.

Also, note that this Apollo Gateway is a little bit different from what we’re used to seeing because we’re not going to pass the instantiated gateway object into an Apollo Server. Instead, we’ll set an onSchemaChange callback in it that will take the gatewaSchema and combine it with the subscription service’s type definitions and resolvers using the makeSubscriptionSchema function (found in the demonstration repo linked above) to make an executable schema for this service to run its subscription operations against:

// Subscription Service

import { ApolloGateway } from "@apollo/gateway";

import { makeSubscriptionSchema } from "federation-subscription-tools";

const gateway = new ApolloGateway();
let schema;

(async () => {
  gateway.onSchemaChange(gatewaySchema => {
    schema = makeSubscriptionSchema({
      gatewaySchema, 
      typeDefs, 
      resolvers 
    });
  });

  await gateway.load({ 
    apollo: getGatewayApolloConfig(apolloKey, graphVariant)
  });
})();

Over in the resolvers for the subscription fields, we also set up a client to access the shared Redis instance, but here we listen for POST_ADDED messages instead:

// Subscription Service

import { pubsub } from "./redis";

const resolvers = {
  Subscription: {
    postAdded: {
      subscribe(_, args) {
        return pubsub.asyncIterator(["POST_ADDED"]);
      }
    }
  }
};

If all we ever needed to do was resolve fields that are available in the message payload provided by the posts service, then our work would be done. However, the payload doesn’t include all of the author details we need, so we need some way to resolve that additional data when requested by clients.

To illustrate what we have to work with when a new POST_ADDED message is received, we can see below what field values are available in the payload and what we’ll still need to resolve from the federated data graph to fully resolve the operation:

To resolve the additional fields from the subscription service, we could define resolvers on the Post type on a per-field basis to fetch data from the gateway. However, this may not be very efficient in terms of network requests nor would it be very future-proof either as the graph evolves.

A more maintainable approach would be to use the generalized resolve method inside of the resolver for the postAdded subscription field to fetch all of the additional required data. The resolve method allows us to intercept the message payload and alter it before resolving the operation.

To fetch data the required data, we can create an Apollo data source that’s capable of diffing the provided payload fields with the actual field selections in the operation, and then query the federated data graph for just those missing fields in a single request:

// Subscription Service

const resolvers = {
  Subscription: {
    postAdded: {
        resolve(payload, args, { dataSources: { gatewayApi } }, info) {
        return gatewayApi.fetchAndMergeNonPayloadPostData(
          payload.postAdded.id, // unique identifier to fetch the post
          payload, // known field values
          info // contains the full selection set of fields to diff
        );
      },
      subscribe(_, args) {
        return pubsub.asyncIterator(["POST_ADDED"]);
      }
    }
  }
};

Using this approach, the data source provides a method to fetch a single post from the federated data graph and this post can be used as an entry point to all other relationships it has to other nodes in the data graph.

The last step is to use the graphql-ws library to fire up a WebSocket server to use as an endpoint to send the subscription operations. Using its useServer function, we can set the GatewayDataSource on the context so that it’s available to the resolvers.

We then use its onSubscribe callback to set the execution arguments for each subscription including the current value of the schema variable we previously set in the gateway onSchemaChange callback. In the onSubscribe callback, we also return a GraphQL error if a client tries to send a query or mutation operation to this WebSocket powered endpoint instead of the usual HTTP endpoint for the federated data graph:

// Subscription Service

(async () => {
  // ...

  const httpServer = http.createServer(function weServeSocketsOnly(_, res) {
    res.writeHead(404);
    res.end();
  });

  const wsServer = new ws.Server({
    server: httpServer,
    path: "/graphql"
  });

  useServer(
    {
      execute,
      subscribe,
      context: ctx => {
        // Instantiate and initialize the GatewayDataSource subclass
        // (data source methods will be accessible on the `gatewayApi` key)
        const ds = new LiveBlogDataSource(gatewayEndpoint);
        const context = addGatewayDataSourceToSubscriptionContext(ctx, ds);
        return context;
      },
      onSubscribe: (_ctx, msg) => {
        // Construct the execution arguments
        const args = {
          schema, // <-- Use the previously defined `schema` here
          operationName: msg.payload.operationName,
          document: parse(msg.payload.query),
          variableValues: msg.payload.variables
        };

        const operationAST = getOperationAST(
          args.document, 
          args.operationName
        );

        // Stops the subscription and sends an error message
        if (!operationAST) {
          return [new GraphQLError("Unable to identify operation")];
        }

        // Handle mutation and query requests
        if (operationAST.operation !== "subscription") {
          return [
            new GraphQLError("Only subscription operations are supported")
          ];
        }

        // Validate the operation document
        const errors = validate(args.schema, args.document);

        if (errors.length > 0) {
          return errors;
        }

        // Ready execution arguments
        return args;
      }
    },
    wsServer
  );

  httpServer.listen({ port }, () => {
    console.log(
      `🚀 Subscriptions ready at ws://localhost:${port}${wsServer.options.path}`
    );
  });
})();

Over on the client-side, we can now create a WebSocketLink for Apollo Client that is based on the link example from the graphql-ws. That link will point to the endpoint for the subscription service and the HttpLink will point directly to the gateway endpoint. Using Apollo Client’s split function, requests can then be directed to either endpoint based on the operation type:

// React App

const wsLink = new WebSocketLink({ url: ws://localhost:5000/ });
const httpLink = new HttpLink({ uri: http://localhost:4000/ });

const link = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === "OperationDefinition" &&
      definition.operation === “subscription"
    );
  },
  wsLink,
  httpLink
);

const client = new ApolloClient({ cache, link });

The Apollo Link configuration above works exactly as it would when executing subscriptions against a monolithic GraphQL API, so from the client’s perspective, everything is business as usual for running subscription operations. That means that a client can send a mutation to create a new post such as this one:

mutation AddPost {
  addPost(authorID: 1, content: "Hello, world!", title: "Breaking News") {
    id
    author {
      name
    }
    content
    publishedAt
    title
  }
}

In turn, the posts subgraph service will publish a message to Redis containing the authorID, content, id, publishedAt time, and title of the new post and other subscribed clients can use subscription operations to request data about the post and the related author in the same way that they could for query and mutation operations:

subscription PostAdded {
  postAdded {
    author {
      id
      name
    } # <-- Fetched from the federated data graph
    content
    id
    publishedAt
    title
  }
}

Considerations

When implementing this solution, there are a few important considerations and cross-cutting concerns to keep in mind. First, this approach requires all Subscription fields to be defined in a single, decoupled subscription service, so the ownership of this service will be shared across teams that manage independent portions of the schema that are applicable to queries and mutations only.

Second, some level of coordination would be necessary to ensure that event labels (such as the POST_ADDED label) are synchronized between the subgraphs that publish events and the subscription service that subscribes to those events to avoid breaking changes.

Third, to improve real-time performance and minimize the number of requests from the subscription service back to the gateway to resolve non-payload fields, some form of caching would likely be desirable, and further, you may perhaps also encourage client developers to avoid over-fetching data in subscription operations and leverage the Apollo Client cache wherever possible

Lastly, removing a type from the federated data graph when the subscription service uses as an output type will be a breaking change unless the type removal happens simultaneously between both services. Good schema governance practices are a must, and it would also be a good idea to leverage your observability tools here to understand what operations are being executed against your federated data graph from the subscription service.

Summary

In this tutorial, we saw how we can get the best of both worlds by isolating a subscription service as a decoupled, independently scalable service while still running subscription operations against a federated data graph's full schema.

You can find the complete code for this tutorial on GitHub as a demonstration library and you can also watch my GraphQL Summit talk about this solution on-demand.

For more details on what’s possible with Apollo Federation, be sure to visit the docs.

Written by

Mandi Wise

Follow

Stay in our orbit!

Become an Apollo insider and get first access to new features, best practices, and community events. Oh, and no junk mail. Ever.

Make this article better!

Was this post helpful? Have suggestions? Consider so we can improve it for future readers ✨.

Similar posts

November 10, 2021

Apollo Router: our GraphQL Federation runtime in Rust

by Jesse Rosenberger

Company