Get Even More Visitors To Your Blog, Upgrade To A Business Listing >>

FastStream: Python's framework for Efficient Message Queue Handling

Posted on Oct 16 Ever felt lost in the complexity of Microservices and Message queues like Kafka and RabbitMQ? Faststream is here to simplify it all. That's precisely why we created FastStream. Initially, it was our solution to the challenges we faced with messaging queues in our own projects. But as it simplified our lives, we realized it could do the same for others. So, we decided to share it with the world.FastStream streamlines the entire process of working with message queues in microservices. Parsing messages, managing networking, and keeping documentation updated—all handled effortlessly.In this blog post, we'll explore how FastStream simplifies microservices development. Let's dive in and discover how FastStream can revolutionize your workflow.Hint: If you want to dive in the code right away, checkout the hands-on tutorial at FastStream documentationOur journey with FastStream started when we needed to integrate our machine learning models into a customer's Apache Kafka environment. To streamline this process, we created FastKafka using AIOKafka, AsyncAPI, and asyncio. It was our first step in making message queue management easier.Later, we discovered Propan, a library created by Nikita Pastukhov, which solved similar problems but for RabbitMQ. Recognizing the potential for collaboration, we joined forces with Nikita to build a unified library that could work seamlessly with both Kafka and RabbitMQ. And that's how FastStream came to be—a solution born out of the need for simplicity and efficiency in microservices development.FastStream is more than just another library; it's a powerful toolkit designed to simplify and supercharge your microservices development. Let's dive into the key features that make FastStream stand out:Multiple Broker Support: FastStream provides a unified API that works seamlessly across multiple message brokers. Whether you're dealing with Kafka, RabbitMQ, or others, FastStream has you covered, making it effortless to switch between them.Pydantic Validation: Leverage the robust validation capabilities of Pydantic to serialize and validate incoming messages. With Pydantic, you can ensure that your data is always in the right format.Automatic Documentation: FastStream keeps you ahead of the game with automatic AsyncAPI documentation generation. Say goodbye to outdated documentation – FastStream ensures it's always up-to-date.Intuitive Development: FastStream offers full-typed editor support, catching errors before they reach runtime. This means you can code with confidence, knowing that issues are caught early in the development process.Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in Dependency Injection (DI) system. Say goodbye to spaghetti code and embrace clean, modular architecture.Testability: FastStream supports in-memory tests, making your Continuous Integration and Continuous Deployment (CI/CD) pipeline faster and more reliable. Test your microservices with ease, ensuring they perform as expected.Seamless Integrations: FastStream plays well with others. It's fully compatible with any HTTP framework you prefer, with a special emphasis on compatibility with FastAPI.Built for Automatic Code Generation: FastStream is optimized for automatic code generation using advanced models like GPT. This means you can leverage the power of code generation to boost your productivity. Checkout the amazing tool we built for the microservice code generation: faststream-gen.FastStream, in a nutshell, offers ease, efficiency, and power in your microservices development journey. Whether you're just starting or looking to scale your microservices, FastStream is your trusted companion. With these core features at your disposal, you'll be well-equipped to tackle the challenges of modern, data-centric microservices.Now, let's get our hands a bit dirty 👷.Let's implement an example python app using FastStream that consumes names from "persons" topic and outputs greetings to the "greetings" topic.To start our project, we will use the prepared cookiecutter FastStream project. To find out more about it, check our detailed guide.Install the cookiecutter package using the following command:Now, run the provided cookiecutter command and fill out the relevant details to generate a new FastStream project, we will name this project "greetings_app":The creation process should look like this:Change the working directory to the newly created directory and install all development requirements using pip:Now we are ready to edit our greetings_app/application.py and tests/test_application.py files to implement our application logic. FastStream brokers provide convenient function decorators @broker.subscriber and @broker.publisher to allow you to delegate the actual process of:consuming and producing data to Event queues, anddecoding and encoding JSON encoded messagesThese decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.Also, FastStream uses Pydantic to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize your input messages just using type annotations.Here is an example python app we talked about:The example application will subscribe to persons Kafka topic and consume Name JSON messages from it. When the application consumes a message it will publish a Greetings JSON message greetings topic.We can save the application into the application.py file and let's take a closer look at the code.Creating a brokerTo create an application, first we need to create a broker. This is the main piece of FastStream and takes care of the defining subscribers and producers.Defining data structuresNext, we need to define the structure of incoming and outgoing data. FastStream is integrated with Pydantic and offers automatic encoding and decoding of JSON formatted messages into Pydantic classes.Defining a publisherNow, we define the publishing logic of our application.Defining a subscriberFinally, we can define the subscribing logic of our application. The app will consume data from the "names" topic and use the defined publisher to produce to the "greetings" topic whenever a message is consumed.The service can be tested using the TestBroker context managers, which, by default, puts the Broker into "testing mode".The Tester will redirect your subscriber and publisher decorated functions to the InMemory brokers, allowing you to quickly test your app without the need for a running broker and all its dependencies.Using pytest, the test for our service would look like this:In the test, we send a test User JSON to the in topic, and then we assert that the broker has responded to the out topic with the appropriate message.We can save the test to the test_application.py file and run the test by executing the following command in your application root file.Here is how the tests execution should look like in your terminal:The application can be started using built-in FastStream CLI command.To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app symbol to the command.After running the command, you should see the following output:Also, FastStream provides you a great hot reload feature to improve your Development ExperienceAnd multiprocessing horizontal scaling feature as well:FastStream provides a command to serve the AsyncAPI documentation, let's use it to document our application.To generate and serve the documentation, run the following command:Now, you should see the following output:Now open your browser at http://localhost:8000 and enjoy in your automatically generated documentation! 🎉Aaaand, that's it! 🎉 🎉 Feel free to experiment further with your application and checkout the documentation for more complex examples.Ready to join the FastStream revolution? Head over to our GitHub repository and show your support by starring it. By doing so, you'll stay in the loop with the latest developments, updates, and enhancements as we continue to refine and expand FastStream.Don't forget, we also have an active Discord channel where you can connect with fellow FastStream enthusiasts, ask questions, and share your experiences. Your active participation in our growing community, whether on GitHub or Discord, is invaluable, and we're grateful for your interest and potential contributions. Together, we can make microservices development simpler and more efficient with FastStream.FastStream is your go-to tool for efficient microservices development. It simplifies message queues, supports various brokers, and offers Pydantic validation and auto-doc generation.We're immensely grateful for your interest, and we look forward to your potential contributions. With FastStream in your toolkit, you're prepared to conquer the challenges of data-centric microservices like never before. Happy coding!Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well Confirm For further actions, you may consider blocking this person and/or reporting abuse Donesrom - Sep 14 Dipankar Shaw - Sep 20 Jeremy Kahn - Oct 8 Deveesh Shetty - Oct 2 Once suspended, airtai will not be able to comment or publish posts until their suspension is removed. Once unsuspended, airtai will be able to comment and publish posts again. Once unpublished, all posts by airtai will become hidden and only accessible to themselves. If airtai is not suspended, they can still re-publish their posts from their dashboard. Note: Once unpublished, this post will become invisible to the public and only accessible to Tvrtko Sternak. They can still re-publish the post if they are not suspended. Thanks for keeping DEV Community safe. Here is what you can do to flag airtai: airtai consistently posts content that violates DEV Community's code of conduct because it is harassing, offensive or spammy. Unflagging airtai will restore default visibility to their posts. DEV Community — A constructive and inclusive social network for software developers. With you every step of your journey. Built on Forem — the open source software that powers DEV and other inclusive communities.Made with love and Ruby on Rails. DEV Community © 2016 - 2023. We're a place where coders share, stay up-to-date and grow their careers.



This post first appeared on VedVyas Articles, please read the originial post: here

Share the post

FastStream: Python's framework for Efficient Message Queue Handling

×

Subscribe to Vedvyas Articles

Get updates delivered right to your inbox!

Thank you for your subscription

×