Optimistic locking on MongoDB

There is a problem on the persistence side of our models: concurrent updates. Some of us apply the technique of last-one-wins, by not protecting against simultaneous modifications of an entity. This can be OK in most cases (like CRUD) but when we have calculations to do based on the previous values we encounter problems. I encountered this situation on my event-based architectures when the read models are updated as the events arrive.

But let’s start with an example: Let’s suppose that we have a simple product entity that can be rated and we only need to provide the average rating and the number or ratings. So, our model would look something like this:

class Product
{
    /**
     * @var AverageRating
     */
    private $averageRating;

    /**
     * @var ProductId
     */
    private $productId;

    public function rate(int $rating)
    {
        $this->averageRating = $this->averageRating->addRating($rating);
    }
}

class AverageRating
{
    private $value = 0;
    private $count = 0;

    public function addRating(int $rating): self
    {
        $other = clone $this;
        $other->value = ($rating + $this->count * $this->value) / (1 + $this->count);
        $other->count++;
        return $other;
    }

    public function getValue(): int
    {
        return $this->value;
    }

    public function getCount(): int
    {
        return $this->count;
    }
}

But if we do like this:

   public function onProductWasRated(ProductWasRated $event)
    {
        $product = $this->loadProduct($event->getProductId());

        $product->rate($event->getRating());
        
        $this->saveProduct($product);
    }

We would be in big trouble if two events arrive at the same time: both the product loading are done (lets suppose the previous rating count would be 100), then both add a rating (the new rating count becomes 101, for each operation) then the new rating is persisted (so 101 rating count is persisted, by the last operation); this is bad because there were 100 ratings and after two more arrived only one had effect. Please notice that the protection that the underlying database is providing us is not helpful (row locking, document locking, whatever).

So, how to add both of the ratings, even if they arrive at the same time? The answer is: by retrying the whole load-calculate-persist process. This is called optimistic locking.

But for this to work you have to detect concurent updates and not succeed if a previous update was done. For this you need to have a version property on your entity. This version is the version that existed when the entity was loaded and when you try to persist the changes to the entity you check that the current version is equal to that loaded version. If the stored entity has a new version then that means that you based your calculations upon an old version of that entity and you must retry.

In order to extract this algorithm into a class we make a Versionable interface like this:

interface Versionable
{
    public function getVersion();
}

Then, our product must implement this interface:

class Product implements Versionable
{
    /**
     * @var AverageRating
     */
    private $averageRating;

    /**
     * @var ProductId
     */
    private $productId;

    /**
     * @var int
     */
    private $version = 0;

    public function rate(int $rating)
    {
        $this->averageRating = $this->averageRating->addRating($rating);
    }
    
    public function getVersion():int
    {
        return $this->version;
    }
}

Our refactored product list would look like this:

    
    public function onProductWasRated(ProductWasRated $event)
    {
        $this->update(
            $event->getProductId(),
            null,//the factory, do not create the product if it does not exist before rating
            function (Product $product) use ($event) { //the updater
                return $product->rate($event->getRating());
            }
        );
    }

    private function update(ProductId $id, callable $factory = null, callable $updater)
    {
        $this->updater->addOrUpdate(
            $this->getCollection(),
            $id,
            function ($id) {
                return $this->loadProduct($id);
            },
            $factory,
            $updater,
            function ($entity) {
                return $this->serializer->convert($entity);
            }
        );
    }

It is OK to use not-type-hinted callables because this is a private manner. To use the same abstraction level in our event handler I have extracted a new method: private function update. This method accepts the ProductId, the factory callable (is called if the product does not exist yet) and the updater function that would perform the actual update on the product.

And now the updater:

class OptimisticMongoDocumentUpdater
{
    public function addOrUpdate(Collection $collection, $id, callable $loader, callable $factory = null, callable $updater, callable $serializer)
    {
        /**
         * We try to add/update the entity in a concurrent safe manner
         * using optimistic locking: we always try to update the existing version;
         * if another concurrent write has finished before us in the mean time
         * then retry the *whole* updating process
         */
        do {
            /** @var Versionable $entity */
            $entity = $loader($id);
            if (!$entity) {
                if (!$factory) {
                    return;//do not create if factory does not exist
                }
                $entity = $factory();
                $version = 0;
            } else {
                $version = $entity->getVersion();
                $entity = $updater($entity);
            }

            $serialized = $serializer($entity);

            unset($serialized['version']);
            $serialized['lastModified'] = new UTCDateTime();

            $result = $collection->updateOne(
                [
                    '_id'     => new ObjectID($id),
                    'version' => $version,
                ],
                [
                    '$set' => $serialized,
                    '$inc' => ['version' => 1],
                ],
                [
                    'upsert' => true,
                ]
            );
        } while (0 == $result->getMatchedCount() && 0 == $result->getUpsertedCount());//no side effect? then concurrent update -> retry
    }
}

Pretty nice, right? 🙂

So, the idea of this updater is that it tries to update the know version of an entity. If that version does not exist anymore, then the whole process is restarted.
Another important detail is that the version is incremented atomically, in the same time as the actual update, using the $inc operator.
You have now a concurrent-proof MongoDB entity updater 🙂

Optimistic locking on MongoDB

Remove code duplication on lists

There is a situation when you have a list of items and you need to perform a lot of actions on them, like querying an item state or changing an item state.

In this situation, a lot of code duplication appears, more exactly a lot of foreach statements on the item list then test if it is the right item then call a method on it.

Let’s suppose that we have a User class with some command and query methods, like this:

<?php
/**
 * Copyright (c) 2017 Constantin Galbenu <gica.galbenu@gmail.com>
 */
namespace Gica;

class User
{
    private $name;
    private $active;
    private $id;

    public function __construct(
        int $id,
        string $name,
        bool $active = false
    )
    {
        $this->id = $id;
        $this->name = $name;
        $this->active = $active;
     }

    public function getId(): int
    {
        return $this->id;
    }

    public function getName(): string
    {
        return $this->name;
    }

    public function isActive(): bool
    {
        return $this->active;
    }

    public function deactivate()
    {
        $this->active = false;
    }

    public function activate()
    {
        $this->active = true;
    }
}

We also have a class that holds a list of Users; let’s name it ClassicUserList, something like this:

<?php
/**
 * Copyright (c) 2017 Constantin Galbenu <gica.galbenu@gmail.com>
 */

namespace Gica;

class ClassicUserList
{
    /**
     * @var User[]
     */
    private $users;

    public function __construct($users)
    {
        $this->users = $users;
    }

    public function activateUser(int $idUser)
    {
        foreach ($this->users as $user) {
            if ($user->getId() === $idUser) {
                $user->activate();
            }
        }
    }

    public function deactivateUser(int $idUser)
    {
        foreach ($this->users as $user) {
            if ($user->getId() === $idUser) {
                $user->deactivate();
            }
        }
    }

    public function isUserActive(int $idUser): bool
    {
        foreach ($this->users as $user) {
            if ($user->getId() === $idUser) {
                return $user->isActive();
            }
        }

        return false;/* if user does not exist*/
    }

    public function userExists(int $idUser): bool
    {
        foreach ($this->users as $user) {
            if ($user->getId() === $idUser) {
                return true;
            }
        }

        return false;/* if user does not exist*/
    }

    public function getUserName(int $idUser): string
    {
        foreach ($this->users as $user) {
            if ($user->getId() === $idUser) {
                return $user->getName();
            }
        }

        return '';/* if user does not exist*/
    }
}

Do you see how many foreach statements exist? There is a lot of code duplication and DRY screams at us!

The solution is to extract the code that repeats into a specialized method: callOnUser. Let’s create another class, name it UserList, that uses this pattern:

<?php
/**
 * Copyright (c) 2017 Constantin Galbenu <gica.galbenu@gmail.com>
 */

namespace Gica;

class UserList
{
    /**
     * @var User[]
     */
    private $users;

    public function __construct($users)
    {
        $this->users = $users;
    }

    public function activateUser(int $idUser)
    {
        $this->callOnUser($idUser, function (User $user) {
            $user->activate();
        });
    }

    public function deactivateUser(int $idUser)
    {
        $this->callOnUser($idUser, function (User $user) {
            $user->deactivate();
        });
    }

    public function isUserActive(int $idUser): bool
    {
        return $this->callOnUser($idUser, function (User $user) {
            return $user->isActive();
        }, false /* if user does not exist*/);
    }

    public function userExists(int $idUser): bool
    {
        return $this->callOnUser($idUser, function () {
            return true;
        }, false /* if user does not exist*/);
    }

    public function getUserName(int $idUser): string
    {
        return $this->callOnUser($idUser, function (User $user) {
            return $user->getName();
        }, '' /* if user does not exist*/);
    }

    private function callOnUser(int $idUser, callable $querier, $default = null)
    {
        foreach ($this->users as $user) {
            if ($user->getId() === $idUser) {
                return $querier($user);
            }
        }

        return $default;
    }
}

The callOnUser method is private, to restrict access to items other that intended by the public interface and it accepts a callback that will be called with the found item.

The method call the callback with item as argument (User in this example) and returns the result.

You could apply this pattern if there are a lot of forwarding methods to the items in the list and if the list is not indexed by the items ID.

Remove code duplication on lists