google-cloud-node: Automatically create topic / subscription if not found.

Presently I have some code to publish to a particular topic, creating it if necessary:

function publish(message, cb){
  topic.publish(message, function(err){
    if(err && err.code == 404){
      logging.info("Creating pub/sub topic.");
      pubsub.createTopic(topicName, function(err, _){
        if(err) return cb(err);
        publish(message, cb);
      });
    } else {
      cb(err);
    }
  });
}

And similar code to listen for messages to a topic. It goes the other way around - trying to create one at first then re-using if it’s already there.

function subscribe(cb){
  topic.subscribe(subscriptionName, function(err, subscription){
    if(!err) return wireSubscription(subscription);
    if(err && err.code == 409) return wireSubscription(topic.subscription(subscriptionName));
    throw err;
  });
  ...
}

According to this comment, it seems that the intended usage is to create the topic and subscription outside of the application content and never touch them afterwards. It’s not clear in the documentation that this is expected.

I think it would be great to either:

  1. Consolidate subscribe/subscription createTopic/topic into combined create/reuse functions (e.g. .topic() .subscription()). There can be an optional argument for erring instead of re-using. This hides all of this somewhat ugly logic from users that do want this use-case.
  2. Explicitly document that topics and subscriptions should be created beforehand.

I would personally prefer the former as it removes the need to write a “bootstrapping” script and simplifies the user-facing API.

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 19 (11 by maintainers)

Commits related to this issue

Most upvoted comments

I would like to share code to solve this problem.

import 'dotenv/config';
import {PubSub} from '@google-cloud/pubsub';

process.env.PUB_SUB_PREFIX = process.env.PUB_SUB_PREFIX || 'local';

process.env.SEARCH_REQUESTS_TOPIC = process.env.SEARCH_REQUESTS_TOPIC || 'search.requests';
process.env.SEARCH_REQUESTS_SUBSCRIPTION = process.env.SEARCH_REQUESTS_SUBSCRIPTION || process.env.SEARCH_REQUESTS_TOPIC + '-sub';

process.env.SEARCH_RESPONSES_TOPIC = process.env.SEARCH_RESPONSES_TOPIC || 'search.responses';
process.env.SEARCH_RESPONSES_SUBSCRIPTION = process.env.SEARCH_RESPONSES_SUBSCRIPTION || process.env.SEARCH_RESPONSES_TOPIC + '-sub';

process.env.CREATE_INDEX_REQUESTS_TOPIC = process.env.CREATE_INDEX_REQUESTS_TOPIC || 'index.create.requests';
process.env.CREATE_INDEX_REQUESTS_SUBSCRIPTION = process.env.CREATE_INDEX_REQUESTS_SUBSCRIPTION || process.env.CREATE_INDEX_REQUESTS_TOPIC + '-sub';

process.env.TRUNCATE_INDEX_REQUESTS_TOPIC = process.env.TRUNCATE_INDEX_REQUESTS_TOPIC || 'index.truncate.requests';
process.env.TRUNCATE_INDEX_REQUESTS_SUBSCRIPTION = process.env.TRUNCATE_INDEX_REQUESTS_SUBSCRIPTION || process.env.TRUNCATE_INDEX_REQUESTS_TOPIC + '-sub';

process.env.UPSERT_DOCUMENT_REQUESTS_TOPIC = process.env.UPSERT_DOCUMENT_REQUESTS_TOPIC || 'document.upsert.requests';
process.env.UPSERT_DOCUMENT_REQUESTS_SUBSCRIPTION = process.env.UPSERT_DOCUMENT_REQUESTS_SUBSCRIPTION || process.env.UPSERT_DOCUMENT_REQUESTS_TOPIC + '-sub';

process.env.DELETE_DOCUMENT_REQUESTS_TOPIC = process.env.DELETE_DOCUMENT_REQUESTS_TOPIC || 'document.delete.requests';
process.env.DELETE_DOCUMENT_REQUESTS_SUBSCRIPTION = process.env.DELETE_DOCUMENT_REQUESTS_SUBSCRIPTION || process.env.DELETE_DOCUMENT_REQUESTS_TOPIC + '-sub';


process.env.PUBSUB_EMULATOR_HOST = process.env.PUBSUB_EMULATOR_HOST || '0.0.0.0:8085';
process.env.PUBSUB_PROJECT_ID = process.env.PUBSUB_PROJECT_ID || 'pure-plaform';

const createTopicAndSubscription = async (pubsub: PubSub, {topic: topicName, subscription: subscriptionName}: {topic: string, subscription: string}) => {
    let topic = pubsub.topic(topicName);

    const [topicExists] = await topic.exists();

    if(!topicExists) {
        [topic] =  await pubsub.createTopic(topicName);
        console.log(`Topic created ${topicName}`);
    } else {
        console.log(`Topic exists ${topicName}`);
    }

    let subscription = topic.subscription(subscriptionName);

    const [subscriptionExists] = await subscription.exists()

    if(!subscriptionExists) {
        [subscription] = await topic.createSubscription(subscriptionName)
        console.log(`Subscription created ${topicName}`);
    } else {
        console.log(`Subscription exists ${topicName}`);
    }

    return {topic, subscription};
}

const main = async (): Promise<string> => {
    console.log("Creating pub/sub topics and subscriptions on project %s.", process.env.PUBSUB_PROJECT_ID);

    const client = new PubSub({projectId: process.env.PUBSUB_PROJECT_ID});

    const pubsubs = [
        { topic: process.env.SEARCH_REQUESTS_TOPIC, subscription: process.env.SEARCH_REQUESTS_SUBSCRIPTION },
        { topic: process.env.SEARCH_RESPONSES_TOPIC, subscription: process.env.SEARCH_RESPONSES_SUBSCRIPTION },
        { topic: process.env.CREATE_INDEX_REQUESTS_TOPIC, subscription: process.env.CREATE_INDEX_REQUESTS_SUBSCRIPTION },
        { topic: process.env.TRUNCATE_INDEX_REQUESTS_TOPIC, subscription: process.env.TRUNCATE_INDEX_REQUESTS_SUBSCRIPTION },
        { topic: process.env.UPSERT_DOCUMENT_REQUESTS_TOPIC, subscription: process.env.UPSERT_DOCUMENT_REQUESTS_SUBSCRIPTION },
        { topic: process.env.DELETE_DOCUMENT_REQUESTS_TOPIC, subscription: process.env.DELETE_DOCUMENT_REQUESTS_SUBSCRIPTION },
    ]

    for(const pubsub of pubsubs) {
        await createTopicAndSubscription(client, pubsub)
    }

    return 'Done';
}

main().then(console.log).catch(console.error);

After run it by

 ts-node .scripts/init-pubsub.ts

you can go to pages

http://localhost:8085/v1/projects/pure-plaform/topics http://localhost:8085/v1/projects/pure-plaform/subscriptions

and see

2021-12-15_22-14

The approach we have taken for every object in this library is that you can create one or reference an existing one. If we changed this, it would need to be changed everywhere and would require two API requests rather than one, causing increased delays for responses.

Now actually, the latest pubsub v1beta2 (in this library master branch) call to create a topic/subscription is a PUT call meaning its idempotent (basically no matter how many times you call it, it won’t cause more effects than if you call it just once). This means that you can effectively “create or get” by always creating the topic regardless of whether it exists or not. If we know we don’t need to create it, it would be silly to try to every time we want to do something with it, this would multiply every request time by a factor of two and provide less stability overall.

Hope this makes sense. Let me know if there’s a smart compromise that could be made here.