Optimistic locking on MongoDB

There is a problem on the persistence side of our models: concurrent updates. Some of us apply the technique of last-one-wins, by not protecting against simultaneous modifications of an entity. This can be OK in most cases (like CRUD) but when we have calculations to do based on the previous values we encounter problems. I encountered this situation on my event-based architectures when the read models are updated as the events arrive.

But let’s start with an example: Let’s suppose that we have a simple product entity that can be rated and we only need to provide the average rating and the number or ratings. So, our model would look something like this:

class Product
{
    /**
     * @var AverageRating
     */
    private $averageRating;

    /**
     * @var ProductId
     */
    private $productId;

    public function rate(int $rating)
    {
        $this->averageRating = $this->averageRating->addRating($rating);
    }
}

class AverageRating
{
    private $value = 0;
    private $count = 0;

    public function addRating(int $rating): self
    {
        $other = clone $this;
        $other->value = ($rating + $this->count * $this->value) / (1 + $this->count);
        $other->count++;
        return $other;
    }

    public function getValue(): int
    {
        return $this->value;
    }

    public function getCount(): int
    {
        return $this->count;
    }
}

But if we do like this:

   public function onProductWasRated(ProductWasRated $event)
    {
        $product = $this->loadProduct($event->getProductId());

        $product->rate($event->getRating());
        
        $this->saveProduct($product);
    }

We would be in big trouble if two events arrive at the same time: both the product loading are done (lets suppose the previous rating count would be 100), then both add a rating (the new rating count becomes 101, for each operation) then the new rating is persisted (so 101 rating count is persisted, by the last operation); this is bad because there were 100 ratings and after two more arrived only one had effect. Please notice that the protection that the underlying database is providing us is not helpful (row locking, document locking, whatever).

So, how to add both of the ratings, even if they arrive at the same time? The answer is: by retrying the whole load-calculate-persist process. This is called optimistic locking.

But for this to work you have to detect concurent updates and not succeed if a previous update was done. For this you need to have a version property on your entity. This version is the version that existed when the entity was loaded and when you try to persist the changes to the entity you check that the current version is equal to that loaded version. If the stored entity has a new version then that means that you based your calculations upon an old version of that entity and you must retry.

In order to extract this algorithm into a class we make a Versionable interface like this:

interface Versionable
{
    public function getVersion();
}

Then, our product must implement this interface:

class Product implements Versionable
{
    /**
     * @var AverageRating
     */
    private $averageRating;

    /**
     * @var ProductId
     */
    private $productId;

    /**
     * @var int
     */
    private $version = 0;

    public function rate(int $rating)
    {
        $this->averageRating = $this->averageRating->addRating($rating);
    }
    
    public function getVersion():int
    {
        return $this->version;
    }
}

Our refactored product list would look like this:

    
    public function onProductWasRated(ProductWasRated $event)
    {
        $this->update(
            $event->getProductId(),
            null,//the factory, do not create the product if it does not exist before rating
            function (Product $product) use ($event) { //the updater
                return $product->rate($event->getRating());
            }
        );
    }

    private function update(ProductId $id, callable $factory = null, callable $updater)
    {
        $this->updater->addOrUpdate(
            $this->getCollection(),
            $id,
            function ($id) {
                return $this->loadProduct($id);
            },
            $factory,
            $updater,
            function ($entity) {
                return $this->serializer->convert($entity);
            }
        );
    }

It is OK to use not-type-hinted callables because this is a private manner. To use the same abstraction level in our event handler I have extracted a new method: private function update. This method accepts the ProductId, the factory callable (is called if the product does not exist yet) and the updater function that would perform the actual update on the product.

And now the updater:

class OptimisticMongoDocumentUpdater
{
    public function addOrUpdate(Collection $collection, $id, callable $loader, callable $factory = null, callable $updater, callable $serializer)
    {
        /**
         * We try to add/update the entity in a concurrent safe manner
         * using optimistic locking: we always try to update the existing version;
         * if another concurrent write has finished before us in the mean time
         * then retry the *whole* updating process
         */
        do {
            /** @var Versionable $entity */
            $entity = $loader($id);
            if (!$entity) {
                if (!$factory) {
                    return;//do not create if factory does not exist
                }
                $entity = $factory();
                $version = 0;
            } else {
                $version = $entity->getVersion();
                $entity = $updater($entity);
            }

            $serialized = $serializer($entity);

            unset($serialized['version']);
            $serialized['lastModified'] = new UTCDateTime();

            $result = $collection->updateOne(
                [
                    '_id'     => new ObjectID($id),
                    'version' => $version,
                ],
                [
                    '$set' => $serialized,
                    '$inc' => ['version' => 1],
                ],
                [
                    'upsert' => true,
                ]
            );
        } while (0 == $result->getMatchedCount() && 0 == $result->getUpsertedCount());//no side effect? then concurrent update -> retry
    }
}

Pretty nice, right? 🙂

So, the idea of this updater is that it tries to update the know version of an entity. If that version does not exist anymore, then the whole process is restarted.
Another important detail is that the version is incremented atomically, in the same time as the actual update, using the $inc operator.
You have now a concurrent-proof MongoDB entity updater 🙂

Optimistic locking on MongoDB

Leave a Reply

Your email address will not be published. Required fields are marked *