Get Even More Visitors To Your Blog, Upgrade To A Business Listing >>

Ingesting Data into OpenSearch using Apache Kafka and Go

Posted on Jul 14 • Originally published at community.aws There are times you might need to write a custom integration layer to fulfill specific requirements in your data pipeline. Learn how to do this with Kafka and OpenSearch using GoScalable data ingestion is a key aspect for a large-scale distributed search and analytics engine like OpenSearch. One of the ways to build a real-time data ingestion pipeline is to use Apache Kafka. It's an open-source event streaming platform used to handle high data volume (and velocity) and integrates with a variety of sources including relational and NoSQL databases. For example, one of the canonical use cases is real-time synchronization of data between heterogeneous systems (source components) to ensure that OpenSearch indexes are fresh and can be used for analytics or consumed downstream applications via dashboards and visualizations.This blog post will cover how to create a data pipeline wherein data written into Apache Kafka is ingested into OpenSearch. We will be using Amazon OpenSearch Serverless, Amazon Managed Streaming for Apache Kafka (Amazon MSK) Serverless. Kafka Connect is a great fit for such requirements. It provides sink connectors for OpenSearch as well as ElasticSearch (which can be used if you opt for the ElasticSearch OSS engine with Amazon OpenSearch). Sometimes though, there are specific requirements or reasons which may warrant the use of a custom solution. For example, you might be using a data source which is not supported by Kafka Connect (rare, but could happen) and don't want to write one from scratch. Or, this could be a one-off integration and you're wondering if it's worth the effort to set up and configure Kafka Connect. Perhaps there are other concerns, like licensing etc.Thankfully, Kafka and OpenSearch provide client libraries for a variety of programming languages which make it possible to write your own integration layer. This is exactly what's covered in this blog! We will make use of a custom Go application to ingest data using Go clients for Kafka and OpenSearch. You will learn:Before we get into the nitty-gritty, here is a quick overview of OpenSearch Serverless and Amazon MSK Serverless.OpenSearch is an open-source search and analytics engine used for log analytics, real-time monitoring, and clickstream analysis. Amazon OpenSearch Service is a managed service that simplifies the deployment and scaling of OpenSearch clusters in AWS.Amazon OpenSearch Service supports OpenSearch and legacy Elasticsearch OSS (up to 7.10, the final open source version of the software). When you create a cluster, you have the option of which search engine to use.You can create an OpenSearch Service domain (synonymous with an OpenSearch cluster) to represent a cluster, with each Amazon EC2 instance acting as a node. However, OpenSearch Serverless eliminates operational complexities by providing on-demand serverless configuration for OpenSearch service. It uses collections of indexes to support specific workloads, and unlike traditional clusters, it separates indexing and search components, with Amazon S3 as the primary storage for indexes. This architecture enables independent scaling of search and indexing functions.You can refer to the details in Comparing OpenSearch Service and OpenSearch Serverless.Amazon MSK (Managed Streaming for Apache Kafka) is a fully managed service for processing streaming data with Apache Kafka. It handles cluster management operations like creation, updates, and deletions. You can use standard Apache Kafka data operations for producing and consuming data without modifying your applications. It supports open-source Kafka versions, ensuring compatibility with existing tools, plugins, and applications.MSK Serverless is a cluster type within Amazon MSK that eliminates the need for manual management and scaling of cluster capacity. It automatically provisions and scales resources based on demand, taking care of topic partition management. With a pay-as-you-go pricing model, you only pay for the actual usage. MSK Serverless is ideal for applications requiring flexible and automated scaling of streaming capacity.Let's start by discussing the high level application architecture before moving on to the architectural considerations.Here is a simplified version of the application architecture that outlines the components and how they interact with each other.The application consists of producer and consumer components, which are Go applications deployed to an EC2 instance:It's worth noting that the blog post has been optimized for simplicity and ease of understanding, hence the solution is not tuned for running production workloads. The following are some of the simplifications that have been made:For a production workload, here are some of the things you should consider:If you still need to deploy your custom application to build a data pipeline from MSK to OpenSearch, here are the range of compute options you can choose from:There is enough material out there that talks about how to use Java based Kafka applications to connect with MSK Serverless using IAM.Let's take a short detour into understanding how this works with Go.MSK Serverless requires IAM access control to handle both authentication and authorization for your MSK cluster. This means that your MSK clients applications (producer and consumer in this case) have to use IAM to authenticate to MSK, based on which they will be allowed or denied specific Apache Kafka actions.The good thing is that the franz-go Kafka client library supports IAM authentication. Here are snippets from the consumer application that show how it works in practice:Note: Since there are multiple Go clients for Kafka (including Sarama), please make sure to consult their client documentation to confirm whether they support IAM authentication.Ok, with that background, let's set up the services required to run our ingestion pipeline.This section will help you set up the following components:You can follow this documentation to setup a MSK Serverless cluster using the AWS Console. Once you do that, note down the following cluster information - VPC, Subnet, Security group (Properties tab) and the cluster Endpoint (click View client information).There are different IAM roles you will need for this tutorial.Start by creating an IAM role to execute the subsequent steps and use OpenSearch Serverless in general with permissions as per Step 1: Configure permissions (in the Amazon OpenSearch documentation).Create another IAM role for the clients applications which will interact with MSK Serverless cluster and use OpenSearch Go client to index data in the OpenSearch Serverless collection. Create an inline IAM policy as below - make sure to substitute the required values.Use the following Trust policy:Finally, another IAM role to which you will attach OpenSearch Serverless Data access policies - more on this in the next step.Create an OpenSearch Serverless collection using the documentation. While following point 8 in Step 2: Create a collection, make sure to configure two Data policies i.e. one each of the IAM roles created in step 2 and 3 in the previous section.Note the for the purposes of this tutorial, we chose Public access type. It's recommended to select VPC for production workloads.Use this documentation to create an AWS Cloud9 EC2 development environment - make sure to use the same VPC as the MSK Serverless cluster.Once complete, you need to do the following: Open the Cloud9 environment, under EC2 Instance, click Manage EC2 instance. In the EC2 instance, navigate to Security and make a note of the attached security group.Open the Security Group associated with the MSK Serverless cluster and add an inbound rule to allow the Cloud9 EC2 instance to connect to it. Choose the security group of the Cloud9 EC2 instance as the source, 9098 as the Port and TCP protocol.Select the Cloud9 environment and choose Open in Cloud9 to launch the IDE. Open a terminal window, clone the GitHub repository and change directory to the folder.Start the producer application:You should see the following logs in the terminal:To send data to the MSK Serverless cluster, use a bash script that will invoke the HTTP endpoint exposed by the application you just started and submit movie data (from movies.txt file) in JSON format using curl:In the producer application terminal logs, you should see output similar to this:For the purposes of this tutorial and to keep it simple and easy to follow, the amount of data has been purposely restricted to 1500 records and the script intentionally sleeps for 1 second after sending each record to the producer. You should be able to follow along comfortably.While the producer application is busy sending data to the movies topic, you can start the consumer application start processing data from the MSK Serverless cluster and index it in the OpenSearch Serverless collection.You should see the following output in the terminal which will indicate that it has indeed started receiving data from the MSK Serverless cluster and indexing it in the OpenSearch Serverless collection.After the process is complete, you should have 1500 movies indexed in the OpenSearch Serverless collection. You don't have to wait for it to finish though. Once there are a few hundred records, you can go ahead and navigate to Dev Tools in the OpenSearch dashboard to execute the below queries.Let's start with a simple query to list all the documents in the index (without any parameters or filters).By default, a search request retrieves the entire JSON object that was provided when indexing the document. Use the _source option to retrieve the source from selected fields. For example, to retrieve only the title, plot and genres fields, run the following query:You can use a Term query to achieve this. For example, to search for movies with the term christmas in the title field, run the following query:**Combine selective field selection with term queryYou can use this query to only retrieve certain fields but are interested in a particular term:Use aggregations to compute summary values based on groupings of the values in a particular field. For example, you can summarize fields like ratings, genre, and year to search results based on the values of those fields. With aggregations, we can answer questions like ‘How many movies are in each genre?“After you are done with the demo, make sure to delete all the services to avoid incurring any additional charges. You can follow the steps in the respective documentation to delete the services.To recap, you deployed a pipeline to ingest data into OpenSearch Serverless using Kafka and then queried it in different ways. Along the way, you also learned about the architectural considerations and compute options to keep in mind for production workloads as well as using Go based Kafka applications with MSK IAM authentication. I would also suggest reading the article Building a CRUD Application in Go for Amazon OpenSearch, particularly if you're looking for a tutorial centered on carrying out OpenSearch operations via the Go SDK.This was pretty lengthy (I think!). Thank you reading it till the end! If you enjoyed this tutorial, found any issues, or have feedback for us, please send it our way!Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well Confirm For further actions, you may consider blocking this person and/or reporting abuse TinoMuchenje - May 29 Irishgeoff11 - Jul 2 Lovepreet Singh - Jun 23 Ivo Dias - Jun 21 Once suspended, abhirockzz will not be able to comment or publish posts until their suspension is removed. Once unsuspended, abhirockzz will be able to comment and publish posts again. Once unpublished, all posts by abhirockzz will become hidden and only accessible to themselves. If abhirockzz is not suspended, they can still re-publish their posts from their dashboard. Note: Once unpublished, this post will become invisible to the public and only accessible to Abhishek Gupta. They can still re-publish the post if they are not suspended. Thanks for keeping DEV Community safe. Here is what you can do to flag abhirockzz: abhirockzz consistently posts content that violates DEV Community's code of conduct because it is harassing, offensive or spammy. Unflagging abhirockzz will restore default visibility to their posts. DEV Community — A constructive and inclusive social network for software developers. With you every step of your journey. Built on Forem — the open source software that powers DEV and other inclusive communities.Made with love and Ruby on Rails. DEV Community © 2016 - 2023. We're a place where coders share, stay up-to-date and grow their careers.



This post first appeared on VedVyas Articles, please read the originial post: here

Share the post

Ingesting Data into OpenSearch using Apache Kafka and Go

×

Subscribe to Vedvyas Articles

Get updates delivered right to your inbox!

Thank you for your subscription

×