Leveraging the power of DynamoDB Streams

Technology
Leveraging the power of DynamoDB Streams

Leveraging the power of DynamoDB Streams

Technology
September 4, 2023
10 Mins
Leveraging the power of DynamoDB Streams

Pre-requisites: A basic idea of AWS DynamoDB and AWS Lambda functions.

In today's fast-paced digital landscape, businesses are constantly searching for ways to gain a competitive edge. At the heart of this pursuit lies the ability to deliver seamless user experiences via real-time data processing, which apparently is an increasingly critical challenge with the rise of cloud computing and scalable, distributed databases.

Let’s say you run an e-commerce platform, and you want to enable a feature such that customers receive instant notifications when products they're interested in become available or when their orders are updated. In such a scenario, you would need to capture and process data changes in real time with minimum latency. This is precisely where Amazon DynamoDB Streams, a powerful and versatile feature of Amazon Web Services (AWS), comes to the rescue. DynamoDB Streams offers a streamlined solution for addressing the challenge of real-time data processing. In this blog post, we will explore what DynamoDB Streams is, why it's essential, and how you can leverage it to enhance the capabilities of your applications.

However, this problem has been existing from a long time, before the concept of DynamoDB Streams were introduced. How were we able to tackle this hurdle?

To answer this, let’s have an overview of the traditional methods. Whereas the conventional relational databases excels at storing and managing structured data, it is not originally designed to handle the demands of real time processing. When we make changes in a row in a relational database, there is no inherent mechanism to notify the other parts of our application about those changes in real time. This leads to a crucial gap in your ability to provide timely updates and reactions to data changes, potentially causing user frustration, missed opportunities, or compliance issues. To address this gap, developers often resort to polling the database at regular intervals to check for changes. This approach is resource-intensive, inefficient, and can lead to unnecessary delays in processing critical data. In today's always-on world, where milliseconds can make a significant difference, such delays are unacceptable.

Enter DynamoDB Streams, a game-changing feature that seamlessly integrates with Amazon DynamoDB, AWS's managed NoSQL database service. With DynamoDB Streams, you can transform your static data into a dynamic force that keeps pace with the ever-evolving needs of your users and your business. Therefore, one of the major advantages of DynamoDB Streams is that it enables the movement of non-critical business logic to an asynchronous process, which offers benefits like improved responsiveness, scalability, reduced blocking, fault tolerance, efficient resource usage, enhanced user experience, task parallelism, support for long-running operations, and more. The approach it follows is capturing real-time changes in DynamoDB tables and triggering AWS Lambda functions to execute that logic, which decouples processing from the main application, scales independently, handles errors, optimises costs, and enhances responsiveness, making it ideal for modern, event-driven architectures.

Now that we have understood why we need DynamoDB streams, let’s delve deeper into the technical aspects.

DynamoDB-Powered User Data Management using Lambda Functions

Imagine you have a DynamoDB table named "Users," which stores user profiles for your application. Whenever a user's profile is updated, it triggers a cascade of actions that need to be executed in real time. Three essential Lambda functions come into play here:

  1. ”onUserObjectChange" from the “AppUserManagementService"
  2. ”onUserObjectChange" from and "WorkplaceUserV2" service.
  3. “upsertWorkplaceUserToElasticSearchV2” from "WorkplaceUserV2" service.
Leveraging the power of DynamoDB Streams

Let's break down this scenario step by step:

1.  DynamoDB Streams: The Real-Time Data Pipeline

time. Every time a record is inserted, updated, or deleted in the "Users" table, a corresponding event is captured by DynamoDB Streams. This event-driven approach ensures that changes to your user data are instantly detected and can trigger subsequent actions.

Leveraging the power of DynamoDB Streams
Real Time Data Processing
2. Event Data Format

When changes occur in your DynamoDB table, such as the "Users" table, Amazon DynamoDB Streams captures these changes in real time. The event data provided by DynamoDB Streams contains essential information, including:

1. Type of Events : DynamoDB Streams captures three main types of events:

  • NSERT: This event type occurs when a new item is inserted into the DynamoDB table.
  • MODIFY: The MODIFY event type captures changes to existing items in the table. This includes updates to one or more attributes of an item.
  • DELETE: When an item is deleted from the table, DynamoDB Streams generates a DELETE event to record this action.

2.Type of Data: DynamoDB Streams provides different types of data depending on the event type and your configuration:

  • Keys Only: Some event records contain only the primary key attributes of the changed item. This type of data is useful when you need to know which item was affected but don't require the full item details.
  • New Image Only: For INSERT events, the event data includes the new image of the item, which represents the item's attributes as they were inserted or updated.
  • Old Image Only: For DELETE events, the event data includes the old image, representing the item's attributes as they were before deletion.
  • New and Old Images: MODIFY events typically provide both the new and old images. The new image reflects the item's attributes after the change, while the old image shows the state of the item before the modification.
  
    // Sample DynamoDB event
    "Records": [
      {
        "eventID": "1",
        "eventName": "INSERT",
        "eventVersion": "1.1",
        "eventSource": "aws:dynamodb",
        "awsRegion": "us-east-1",
        "dynamodb": {
          "Keys": {
            "UserId": {
              "N": "12345"
            }
          },
          "NewImage": {
            "UserId": {
              "N": "12345"
            },
            "Username": {
              "S": "john_doe"
            },
            "Email": {
              "S": "john.doe@example.com"
            }
          },
          "ApproximateCreationDateTime": 1627992080
        },
        "eventSourceARN": "arn:aws:dynamodb:us-east-1:123456789012:table/Users/stream/2023-09-01T00:00:00.000"
      }
    ]
  

3. Lambda Functions: Handling the Changes

a. "onUserObjectChange" of AppUserManagementService and WorkplaceUserV2 Service:

This AWS Lambda function is designed to handle user profile changes as they occur. When DynamoDB Streams detects an update in the "Users" table, it triggers both the "onUserObjectChange" Lambda functions.

  • Data Transformation: The Lambda functions receive the event data, which includes both the old image (previous state) and the new image (updated state) of the user profile.
  • Business Logic: Inside the Lambda functions, application's specific business logic is executed, utilising the old and new images as needed. This may include processing the updated user data, generating authentication tokens, or updating related information.
  • Updates to "AppUsers_V2" Table and "WorkplaceUsers_V2": After processing and applying any necessary transformations or logic, the functions update the "AppUsers_V2" and “WorkplaceUser_V2” table with the new user information or any relevant changes.This ensures that your application's data remains up to date and synchronised for user-related operations.
  
    functions:
      onUserObjectChangeV2:
        handler: src/index/aws.onUserObjectChange
        events:
          - stream:
              type: dynamodb
              arn: ${self:custom.StreamArn.Users.${env:NODE_ENV}}
  

b. “"upsertWorkplaceUserToElasticSearchV2" of WorkplaceUserV2 Service

The "workplaceUserV2" service includes another Lambda function named "upsertWorkplaceUserToElasticSearchV2." This function gets triggered whenever there's any change in the “WorkplaceUsers_V2" table.

  • Data Transformation: This Lambda function receives event data that includes both the old and new images of user profiles from the "workplaceuserv2" table.
  • Business Logic: It executes specific business logic related to user data, possibly generating or updating workplace-related information.
  • Updates to Amazon Elasticsearch: After processing the event and applying relevant logic, this function updates the "WorkplaceUserV2" data in an Amazon Elasticsearch database, ensuring that Elasticsearch remains in sync with the latest user data.
  
    functions:
      upsertWorkplaceUserToElasticSearchV2:
        handler: src/index/aws.upsertWorkplaceUserToElasticSearch
        events:
          - stream:
              type: dynamodb
              arn: ${self:custom.StreamArn.WorkplaceUserV2.${env:NODE_ENV}}
  

4. Event Filters for Lambda Functions

Now, that we have configured DynamoDB streams, we might find that even the minor updates which have no immediate impact on our application is created. Therefore, we would want to process the events efficiently by avoiding unnecessary lambda invocations in order to keep the costs under control. Hence, “Event Filters” come to our rescue, a logic evaluation that has been pushed onto AWS.

Event filters allow us to filter out events that are irrelevant to our business logic by defining a filter criteria in order to control which events can invoke a lambda function. Suppose if we want to trigger lambda function for modify events in the batch of 10 events, the following changes will be required in our serverless.yml file.

  
    functions:
      onUserObjectChangeV2:
        handler: src/index/aws.onUserObjectChange
        events:
          - stream:
              type: dynamodb
              arn: ${self:custom.StreamArn.Users.${env:NODE_ENV}}
              batchSize: 10  # Adjust the batch size as needed
              filter:
                eventName:
                  - MODIFY
  

The specific filter conditions you can apply may vary depending on the event source, but here are common types of filters you can use:

  • Event Type Filter: Filter events based on their type, such as INSERT, MODIFY, or DELETE. This allows you to process specific types of events while ignoring others.
  • Attribute-Based Filtering: Filter events based on the values of specific attributes within the event data. For example, you can trigger a Lambda function only if a certain attribute meets a particular condition (e.g., status = 'completed').
  • Pattern Matching: Use pattern matching to match event data against specific patterns or regular expressions. This is useful for filtering events with attribute values that follow a certain format or pattern.
  • ARN-Based Filtering: Filter events based on the Amazon Resource Name (ARN) of the event source. This allows you to specify which event sources should trigger your Lambda functions.
  • Time-Based Filtering: Apply time-based filters to process events that occurred within a specific timeframe. For example, you can process events that occurred in the last hour or events that are scheduled for the future.
  • Custom Metadata Filtering: If your event source includes custom metadata, you can filter events based on this metadata. This is helpful for scenarios where you want to process events associated with specific tags or labels.
  • Batch Size: While not a traditional filter, you can control the batch size to determine how many events are included in a single Lambda invocation. Adjusting the batch size can impact the way you process events.
  • Complex Logical Filters: Combine multiple filter conditions using logical operators (AND, OR) to create complex filtering rules. This allows you to define precise criteria for event processing.
  • Compound Filters: Some event sources may support compound filters, which combine multiple filter conditions into a single filter expression for more advanced filtering capabilities.

However, factors such as your application's architecture, scalability requirements, technology stack, and the specific database system you are using, also needs to be taken under consideration while deciding what approach you are going to move forward with, in order to solve the use case of real time data processing. Now that we have discussed how DynamoDB streams and AWS lambda functions can be used, lets look into some other approaches of handling this issue:

  1. Change Data Capture (CDC) Tools: Use dedicated CDC tools or platforms that can capture changes from your database in real-time. These tools can include Debezium for relational databases, Apache Nifi, or third-party solutions like Change Data Capture for MongoDB.
  2. Database Triggers: Implement database triggers within your database system to capture changes and write them to a dedicated change log table. Your application can then monitor this change log table for real-time updates.
  3. WebSocket Notifications: Utilize WebSocket connections to push real-time updates to connected clients when changes occur in the database. This is particularly useful for applications requiring real-time data synchronization between the server and clients.
  4. Message Queues: Use message queuing systems like RabbitMQ, Apache Kafka, or Amazon SQS to publish database change events as messages. Subscribers can then consume these messages in real-time to perform actions based on the changes.
  5. Database Change Polling: Implement a polling mechanism in your application to periodically query the database for changes. While not truly real-time, this approach can be effective for scenarios where near-real-time updates are acceptable.
  6. Custom Polling Services: Develop custom polling services or cron jobs that periodically query the database for changes and take action when changes are detected. This approach can be suitable for applications with less stringent real-time requirements.
  7. In-Memory Databases or Caches: Maintain an in-memory database or cache, such as Redis or Amazon ElastiCache, and keep it synchronized with your primary database. Updates to the primary database trigger updates in the cache, providing real-time access to frequently accessed data.
  8. Event Sourcing: Implement event sourcing patterns, where changes to the database are recorded as immutable events. These events can be consumed by subscribers for real-time updates and can serve as a historical record of changes.
  9. Third-Party Integration: Leverage third-party services or APIs that offer real-time database change notifications. Some database providers and cloud platforms offer built-in real-time change notification features.
  10. Custom Webhooks: Implement custom webhook endpoints in your application to receive HTTP notifications from external systems or services when database changes occur.

In conclusion, the combination of DynamoDB Streams and AWS Lambda functions provides a dynamic and efficient solution for processing real-time events from a DynamoDB database. It offers real-time responsiveness, cost-effectiveness through serverless computing, simplified architecture, scalability, flexibility in implementing custom logic, easy integration with other AWS services, and robust monitoring capabilities. This combination empowers developers to build event-driven architectures that react swiftly to changes in the database, enhancing application performance and user experiences.

This blog post was originally published here.

This blog post was originally published here.

FAQ

Build Apps That Fit Your Business Operations
Build Apps That Fit Your Business OperationsGet Started - It's free