Laravelで作る分析・分散処理アプリケーション その1

先日のPHPカンファレンスPHPカンファレンス関西、buildersconでお話しした内容を元にして、
Laravel(PHP)を使って分析処理の簡単な実装や、
ミドルウェアを組み合わせた分散処理の実装を紹介します。

本ブログのサンプルアプリケーションは下記になりますので、
コードやミドルウェアなどを参照ください。

github.com

Laravelとkafka Connect、Elasticsearchの組み合わせ

Apache Kafkaを使ったスケーラブルなアプリケーションの入門編です。

レコード量が多い複雑なコンテンツのデータや検索要件、Like検索など、

RDBMSの不得意な分野などを対応することも多いかと思いますが、
RDBMSとElasticsearchを併用しKafkaで複雑さを吸収して、
アプリケーションをスケールさせるようにしてみましょう。

データベースのテーブル設計時に想定されるデータモデリングと、
サービスが成長することによってデータの複雑化と、検索の複雑さが増し、
ビジネス要件がより高度になっていきます。
これらを解消するために全文検索を導入するなどが考えられますが、
このサンプルではそういったデータストレージが異なる場合でも、
CQRS+ESライクに問題を解決するヒントになればと思います。

サンプルアプリケーションでは /fulltext 配下のurlが該当します。

Kafka Producerの実装

いわゆるMessage Queueのメッセージ送信を実装します。

LaravelのQueueを想像されるかもしれませんが、フレームワークのQueueではなく、
こういった処理は原則他の言語でも利用できるようにする必要がありますので、
フレームワークの知識をメッセージに混入させることなく実装するようにします。

Producerの実装自体は難しいものではありません。

<?php
declare(strict_types=1);

namespace App\Foundation\Producer;

use Psr\Log\LoggerInterface;
use RdKafka\Conf;
use RdKafka\Producer as KafkaProducer;
use RdKafka\Producer as RdkafkaProducer;
use RdKafka\ProducerTopic;

/**
 * Class Producer
 */
class Producer
{
    /** @var RdkafkaProducer */
    protected $producer;

    /** @var string */
    protected $topic = 'default';

    /** @var null|LoggerInterface  for optional logger */
    protected $logger;

    /** @var string */
    protected $brokers;

    /** @var array */
    protected $options;

    /**
     * Producer constructor.
     *
     * @param string $topic
     * @param string $brokers
     * @param array  $options
     */
    public function __construct(string $topic, string $brokers, array $options = [])
    {
        $this->topic = $topic;
        $this->brokers = $brokers;
        $this->options = $options;
    }

    /**
     * @param AbstractProduceDefinition $definition
     */
    public function produce(AbstractProduceDefinition $definition)
    {
        $kafkaTopic = $this->producerTopic();
        $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $definition->payload());
        if ($this->logger instanceof LoggerInterface) {
            $this->logger->info($definition->payload());
        }
        $this->producer->poll(0);
    }

    /**
     * @param LoggerInterface $logger
     */
    public function setLogger(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @return ProducerTopic
     */
    protected function producerTopic(): ProducerTopic
    {
        $this->producer = $this->producer();
        $this->producer->setLogLevel(LOG_DEBUG);
        $this->producer->addBrokers($this->brokers);

        return $this->producer->newTopic($this->topic);
    }

    /**
     * @return KafkaProducer
     */
    protected function producer(): KafkaProducer
    {
        $conf = new Conf();
        foreach ($this->options as $key => $item) {
            $conf->set($key, $item);
        }

        return new KafkaProducer($conf);
    }
}

Command 実装

ここで指すCommandとは、artisan commandのアプリケーションではなく、
CQRSのCommandとQuery、データの書き込みと読み込みを分離して実装します。

以下は登録処理のコントローラクラスです。

<?php
declare(strict_types=1);

namespace App\Http\Controllers\Fulltext;

use App\Events\SinkConnect;
use App\Http\Controllers\Controller;
use App\Http\Requests\FulltextRequest;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Http\RedirectResponse;
use Illuminate\Routing\Redirector;

/**
 * Class RegisterAction
 */
final class RegisterAction extends Controller
{
    /** @var Dispatcher */
    private $dispatcher;

    /** @var Redirector */
    private $redirector;

    /**
     * RegisterAction constructor.
     *
     * @param Dispatcher $dispatcher
     * @param Redirector $redirector
     */
    public function __construct(Dispatcher $dispatcher, Redirector $redirector)
    {
        $this->dispatcher = $dispatcher;
        $this->redirector = $redirector;
    }

    /**
     * @param FulltextRequest $request
     *
     * @return RedirectResponse
     */
    public function __invoke(FulltextRequest $request): RedirectResponse
    {
        // 登録処理後に実行されるevent
        $this->dispatcher->dispatch(
            new SinkConnect(strval($request->get('fulltext')))
        );

        return $this->redirector->route('fulltext.index');
    }
}

サンプルではデータ書き込み(RDBMS)は省略していますが、
上記のコードの __invoke メソッドに記述するだけです。

何か登録処理が行われたものとして、その後にEventを発動しています。
このコントローラクラスでは、フロントで送信された文章(ブログの記事など)を保存する、
という機能を提供していますが、Kafkaへの通知はアプリケーションの要件ではなく、
システム都合の処理になりますので、ここのクラスではなく、Event Handlerが処理を行う様になっています。

このEventクラスはシンプルなクラスです。

<?php
declare(strict_types=1);

namespace App\Events;

/**
 * Class SinkConnect
 */
final class SinkConnect
{
    /** @var string */
    private $note;

    /**
     * SinkConnect constructor.
     *
     * @param string $note
     */
    public function __construct(string $note)
    {
        $this->note = $note;
    }

    /**
     * @return string
     */
    public function note(): string
    {
        return $this->note;
    }
}

このEventを処理するHandlerクラスがKafkaのメッセージを送信します。

Event Handler

SinkConnectイベントに反応して処理を行うクラスを実装します。

Handlerクラスから最初に紹介したKafka Producerを利用できる様に次の様に実装しています。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use App\Foundation\Producer\Producer;
use App\Foundation\Producer\AbstractProduceDefinition;

/**
 * Class AbstractProduce
 */
abstract class AbstractProduce
{
    /** @var Producer */
    protected $producer;

    /**
     * MessageProduceUsecase constructor.
     *
     * @param Producer $producer
     */
    public function __construct(Producer $producer)
    {
        $this->producer = $producer;
    }

    /**
     * @param AbstractProduceDefinition $analyze
     */
    public function run(AbstractProduceDefinition $analyze)
    {
        $this->producer->produce($analyze);
    }
}

後述する分析処理にもKafkaを利用するためこのクラスを継承して利用します。

<?php
declare(strict_types=1);

namespace App\DataAccess;

/**
 * Class RegisterProduce
 */
final class RegisterProduce extends AbstractProduce
{

}

kafkaのtopicを処理によって切り替えるため、クラスを別クラスとして切り出しています。
このクラスを利用するHandlerクラスは以下のようになります。

<?php
declare(strict_types=1);

namespace App\Listeners;

use Ramsey\Uuid\Uuid;
use App\Events\SinkConnect;
use App\DataAccess\RegisterProduce;
use App\Definition\FulltextDefinition;

/**
 * Class SinkConnectHandler
 */
final class SinkConnectHandler
{
    /** @var RegisterProduce */
    protected $producer;

    /**
     * SinkConnectHandler constructor.
     *
     * @param RegisterProduce $producer
     */
    public function __construct(RegisterProduce $producer)
    {
        $this->producer = $producer;
    }

    /**
     * @param SinkConnect $connect
     */
    public function handle(SinkConnect $connect)
    {
        $this->producer->run(
            new FulltextDefinition(Uuid::uuid4()->toString(), $connect->note())
        );
    }
}

送信が可能な状態になりましたが、接続情報がないため、
これをServiceProviderを使って設定値を外から渡します。

設定値はconfig/kafka.phpに配置します

<?php

return [
    'topics'   => [
       'fulltext.register' => [
            'topic'   => 'fulltext.register',
            'brokers' => '127.0.0.1',
            'options' => [
                'socket.blocking.max.ms'       => '1',
                'queue.buffering.max.ms'       => '1',
                'queue.buffering.max.messages' => '1000',
                'client.id'                    => 'testingClient',
            ],
        ],
    ]
];

この設定値を使い、RegisterProduceクラスに与えます

<?php

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        $this->app->when(RegisterProduce::class)
            ->needs(Producer::class)
            ->give(function (Application $app) {
                $kafkaConfig = $app['config']->get('kafka');
                $topic = $kafkaConfig['topics']['fulltext.register'];
                $producer = new Producer($topic['topic'], $topic['brokers'], $topic['options']);
                $producer->setLogger($app['log']);

                return $producer;
            });
    }
}

これでEventが発動するとHandlerクラスが反応し、Kafkaへメッセージが送信されます。
格納されるKafkaのtopicは fulltext.register です。 ここで送信されるメッセージには、uuidと、フォームで入力された文字列となります。

Kafka Connectの設定

送信されたメッセージをElasticsearchに送信するためにKafka Connect Elasticsearchの設定を行います。

github.com

これはConfluentをインストールすると含まれますので、追加で入れる必要はありません。

サンプルアプリケーションにはConfluentやElasticsearchも含まれています

ここでは一台で動かすためStandaloneモードで起動させます。

connect-standalone.propertiesファイルを作成して、以下の内容を記述します。

bootstrap.servers=192.168.10.10:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

rest.port=8093

kafka connectを利用するにはconverterがいくつか種類があり、代表的なものはavroですが、
ここではjsonConverterを指定します。(avro利用例は公式を参照ください)

Elasticsearch Connector — Confluent Platform 3.3.0 documentation

このファイルを /etc/schema-registry/connect-standalone.properties として設置します。
*サンプルは設置済みです。

次にkafka connectで直接elasticsearchに接続して、インデックスにデータを追加する設定を記述します。
elasticsearch-connect.propertiesファイルを作成しkafkaのtopic情報などを記述します。

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

tasks.max=1
topics=fulltext.register
key.ignore=true
connection.url=http://192.168.10.10:9200

schema.ignore=true
type.name=kafka-connect

上記の内容で、elasticsearchにKafkaのtopicと同じく、fulltext.register index が作成されます。
documentの_typeは kafka-connect となります。

これを /etc/kafka-connect-elasticsearch/elasticsearch-connect.properties として設置します。

次にKafkaに上記のKafka Connectを登録します。

sudo connect-standalone -daemon /etc/schema-registry/connect-standalone.properties /etc/kafka-connect-elasticsearch/elasticsearch-connect.properties
sudo confluent load elasticsearch-sink

daemonでKafka Connect Elasticsearchを起動し、 ConfluentにConnectorをLoadして登録します。

この状態で、Laravelで実装した処理を実行するとElasticsearchのindexに挿入されていきます。

Query実装

CQRSのQuery、データの読み込みを実装します。

LaravelのElasticsearchパッケージなどでも簡単に操作ができます。
サンプルではElasticsearchのphpクライアントライブラリを利用して実装しています。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use Acme\Blog\Entity\EntryCriteria;
use App\Foundation\Elasticsearch\ElasticseachClient;

/**
 * Class FulltextIndex
 */
class FulltextIndex implements EntryCriteria
{
    /** @var ElasticseachClient */
    protected $client;

    /** @var string */
    protected $index = 'fulltext.register';

    /**
     * FulltextIndex constructor.
     *
     * @param ElasticseachClient $client
     */
    public function __construct(ElasticseachClient $client)
    {
        $this->client = $client;
    }

    /**
     * @return array
     */
    public function all(): array
    {
        $result = $this->client->client()->search([
            "index"  => $this->index,
            'type'   => 'kafka-connect',
            "body"   => [
                "query" => [
                    "match_all" => new \stdClass(),
                ],
            ],
        ]);
        $map = [];
        if (count($result)) {
            foreach($result['hits']['hits'] as $hit) {
                $map[] = $hit['_source'];
            }
        }

        return $map;
    }

    public function queryBy(string $string)
    {
        // TODO: Implement queryBy() method.
    }
}

Elasticsearchに問い合わせた結果は、
src/Entry (ドメイン)配下でRepositoryなどを経由してControllerを介してhtml出力されます。
詳細な実装はサンプルコードを参照ください

<?php
declare(strict_types=1);

namespace App\Http\Controllers\Fulltext;

use Acme\Blog\Specification\ActiveEntrySpecification;
use Acme\Blog\Usecase\RetrieveEntryUsecase;
use App\Http\Controllers\Controller;
use App\Http\Responders\HtmlResponder;
use Illuminate\Http\Response;

/**
 * Class IndexAction
 */
final class IndexAction extends Controller
{
    /** @var ActiveEntrySpecification */
    private $specification;

    /** @var RetrieveEntryUsecase */
    private $usecase;

    /**
     * IndexAction constructor.
     *
     * @param ActiveEntrySpecification $specification
     * @param RetrieveEntryUsecase     $usecase
     */
    public function __construct(
        ActiveEntrySpecification $specification,
        RetrieveEntryUsecase $usecase
    ) {
        $this->specification = $specification;
        $this->usecase = $usecase;
    }

    /**
     * @param HtmlResponder $responder
     *
     * @return Response
     */
    public function __invoke(HtmlResponder $responder): Response
    {
        $responder->template('fulltext.index');

        return $responder->emit([
            'list' => $this->usecase->run($this->specification),
        ]);
    }
}

アプローチ

このサンプルではKafka Connectを使ってデータの分散を行い、
RDBMSとElasticsearchの責務を分割した例を紹介しました。

小さなアプリケーションではオーバスペックな実装ですが、
確実なデータと検索に特化したミドルウェアを組み合わせて、データ上のパフォーマンスと堅実さを提供することができます。
また分散することで、どちらかに障害が発生した場合でもアプリケーションの動作を担保したり、
障害復旧などにも活かすことができるかと思います。

次回は物理的に分散したデータベースをPrestoで集約させて、Kafka Consumer経由でElasticsearchに格納する実装例を紹介します。