Skip to content

Commit

Permalink
implement event stream as an iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Sep 25, 2015
1 parent 9054385 commit 52c7bb1
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 59 deletions.
11 changes: 7 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@
],
"require": {
"php": ">=5.5",
"prooph/event-store": "dev-develop",
"doctrine/dbal": "^2.4",
"beberlei/assert": "^2.0",
"prooph/common": "^3.5",
"container-interop/container-interop" : "~1.1"
"prooph/common": "^3.5"
},
"require-dev": {
"prooph/event-store": "dev-master",
"phpunit/phpunit": "4.7.*",
"container-interop/container-interop": "^1.1",
"phpunit/phpunit": "4.8.*",
"fabpot/php-cs-fixer": "1.7.*",
"satooshi/php-coveralls": "dev-master"
},
"suggest": {
"container-interop/container-interop": "For usage of provided factories"
},
"autoload": {
"psr-4": {
"Prooph\\EventStore\\Adapter\\Doctrine\\": "src/"
Expand Down
60 changes: 22 additions & 38 deletions src/DoctrineEventStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\Schema;
use Iterator;
use Prooph\Common\Messaging\Message;
use Prooph\Common\Messaging\MessageConverter;
use Prooph\Common\Messaging\MessageDataAssertion;
Expand Down Expand Up @@ -55,11 +56,6 @@ final class DoctrineEventStoreAdapter implements Adapter, CanHandleTransaction
*/
private $payloadSerializer;

/**
* @var array
*/
private $standardColumns = ['event_id', 'event_name', 'created_at', 'payload', 'version'];

/**
* @param Connection $dbalConnection
* @param MessageFactory $messageFactory
Expand Down Expand Up @@ -88,7 +84,7 @@ public function __construct(
*/
public function create(Stream $stream)
{
if (count($stream->streamEvents()) === 0) {
if (!$stream->streamEvents()->valid()) {
throw new RuntimeException(
sprintf(
"Cannot create empty stream %s. %s requires at least one event to extract metadata information",
Expand All @@ -98,7 +94,7 @@ public function create(Stream $stream)
);
}

$firstEvent = $stream->streamEvents()[0];
$firstEvent = $stream->streamEvents()->current();

$this->createSchemaFor($stream->streamName(), $firstEvent->metadata());

Expand All @@ -107,11 +103,11 @@ public function create(Stream $stream)

/**
* @param StreamName $streamName
* @param Message[] $streamEvents
* @param Iterator $streamEvents
* @throws \Prooph\EventStore\Exception\StreamNotFoundException If stream does not exist
* @return void
*/
public function appendTo(StreamName $streamName, array $streamEvents)
public function appendTo(StreamName $streamName, Iterator $streamEvents)
{
foreach ($streamEvents as $event) {
$this->insertEvent($streamName, $event);
Expand All @@ -134,7 +130,7 @@ public function load(StreamName $streamName, $minVersion = null)
* @param StreamName $streamName
* @param array $metadata
* @param null|int $minVersion
* @return Message[]
* @return Iterator
*/
public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata, $minVersion = null)
{
Expand All @@ -160,35 +156,9 @@ public function loadEventsByMetadataFrom(StreamName $streamName, array $metadata

/* @var $stmt \Doctrine\DBAL\Statement */
$stmt = $queryBuilder->execute();
$stmt->setFetchMode(\PDO::FETCH_ASSOC);

$events = [];

foreach ($stmt->fetchAll(\PDO::FETCH_ASSOC) as $eventData) {
$payload = $this->payloadSerializer->unserializePayload($eventData['payload']);

//Add metadata stored in table
foreach ($eventData as $key => $value) {
if (! in_array($key, $this->standardColumns)) {
$metadata[$key] = $value;
}
}

$createdAt = \DateTimeImmutable::createFromFormat(
'Y-m-d\TH:i:s.u',
$eventData['created_at'],
new \DateTimeZone('UTC')
);

$events[] = $this->messageFactory->createMessageFromArray($eventData['event_name'], [
'uuid' => $eventData['event_id'],
'version' => (int)$eventData['version'],
'created_at' => $createdAt,
'payload' => $payload,
'metadata' => $metadata
]);
}

return $events;
return new DoctrineStreamIterator($stmt, $this->messageFactory, $this->payloadSerializer, $metadata);
}

/**
Expand All @@ -214,6 +184,11 @@ public function createSchemaFor(StreamName $streamName, array $metadata, $return
}
}

/**
* @param Schema $schema
* @param string $table
* @param array $metadata
*/
public static function addToSchema(Schema $schema, $table, array $metadata)
{
$table = $schema->createTable($table);
Expand All @@ -232,6 +207,9 @@ public static function addToSchema(Schema $schema, $table, array $metadata)
$table->setPrimaryKey(['event_id']);
}

/**
* Begin transaction
*/
public function beginTransaction()
{
if (0 != $this->connection->getTransactionNestingLevel()) {
Expand All @@ -241,11 +219,17 @@ public function beginTransaction()
$this->connection->beginTransaction();
}

/**
* Commit transaction
*/
public function commit()
{
$this->connection->commit();
}

/**
* Rollback transaction
*/
public function rollback()
{
$this->connection->rollBack();
Expand Down
155 changes: 155 additions & 0 deletions src/DoctrineStreamIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<?php
/*
* This file is part of the prooph/event-store-mongodb-adapter.
* (c) 2014 - 2015 prooph software GmbH <contact@prooph.de>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* Date: 09/25/15 - 15:29
*/

namespace Prooph\EventStore\Adapter\Doctrine;

use Iterator;
use Doctrine\DBAL\Driver\PDOStatement;
use Prooph\Common\Messaging\Message;
use Prooph\Common\Messaging\MessageFactory;
use Prooph\EventStore\Adapter\PayloadSerializer;

/**
* Class DoctrineStreamIterator
* @package Prooph\EventStore\Adapter\Doctrine
*/
final class DoctrineStreamIterator implements Iterator
{
/**
* @var PDOStatement
*/
private $statement;

/**
* @var MessageFactory
*/
private $messageFactory;

/**
* @var PayloadSerializer
*/
private $payloadSerializer;

/**
* @var array
*/
private $metadata;

/**
* @var array
*/
private $standardColumns = ['event_id', 'event_name', 'created_at', 'payload', 'version'];

/**
* @var array|false
*/
private $currentItem;

/**
* @var int
*/
private $currentKey = -1;

/**
* @param PDOStatement $statement
* @param MessageFactory $messageFactory
* @param PayloadSerializer $payloadSerializer
* @param array $metadata
*/
public function __construct(
PDOStatement $statement,
MessageFactory $messageFactory,
PayloadSerializer $payloadSerializer,
array $metadata
) {
$this->statement = $statement;
$this->messageFactory = $messageFactory;
$this->payloadSerializer = $payloadSerializer;
$this->metadata = $metadata;

$this->next();
}

/**
* @return null|Message
*/
public function current()
{
if (false === $this->currentItem) {
return null;
}

$payload = $this->payloadSerializer->unserializePayload($this->currentItem['payload']);

//Add metadata stored in table
foreach ($this->currentItem as $key => $value) {
if (! in_array($key, $this->standardColumns)) {
$metadata[$key] = $value;
}
}

$createdAt = \DateTimeImmutable::createFromFormat(
'Y-m-d\TH:i:s.u',
$this->currentItem['created_at'],
new \DateTimeZone('UTC')
);

return $this->messageFactory->createMessageFromArray($this->currentItem['event_name'], [
'uuid' => $this->currentItem['event_id'],
'version' => (int) $this->currentItem['version'],
'created_at' => $createdAt,
'payload' => $payload,
'metadata' => $this->metadata
]);
}

/**
* Next
*/
public function next()
{
$this->currentItem = $this->statement->fetch();

if (false !== $this->currentItem) {
$this->currentKey++;
} else {
$this->currentKey = -1;
}
}

/**
* @return bool|int
*/
public function key()
{
if (-1 === $this->currentKey) {
return false;
}

return $this->currentKey;
}

/**
* @return bool
*/
public function valid()
{
return false !== $this->currentItem;
}

/**
* Rewind (does nothing)
*/
public function rewind()
{
// do nothing
}
}
Loading

0 comments on commit 52c7bb1

Please # to comment.