ytake blog

Web Application Developer

気軽にHackチャレンジ マイクロフレームワーク公開

PHPと分離し始めたHHVM/Hackですが、
折角なので多くの方が やってみた で終わらないように、
シンプルで薄いマイクロなフレームワーク、というか、
Web Applicationのボイラープレートと言ってもいいくらいの簡単なものを公開しました。

github.com

*名前は 指輪物語 より

機能を備えたフレームワークを作るよりも、
最近はコンポーネントなどを組み合わせる開発者も多いため、
最低限のリクエスト・レスポンス以外の機能を付け加える予定はありませんが、
とっかかりには小さくチャレンジできるのではと思います。

簡単な使い方などを紹介します。

HHVM環境構築

Ubuntu16.04などのサーバが手元にある方は、簡単にHHVMの環境が構築できます

Installation: Introduction

またはVagrantで簡単に構築することもできます。
公開しているytake/gardening-hhvmを利用すると、3.23.4の環境が起動します。

$ vagrant box add ytake/gardening-hhvm

Vagrantの詳細については gardening-hhvm#install-gardening-box

Dockerについては後日

install

composer を使ってcreate-projectをする場合は、次のコマンドを実行します。

$ hhvm -d xdebug.enable=0 -d hhvm.jit=0 -d hhvm.php7.all=1 -d hhvm.hack.lang.auto_typecheck=0 \
 $(which composer) create-project nazg/skeleton [アプリケーション名] --prefer-dist

依存しているPSR-15インターフェースがphp7以上となっているため、
PHP7モードで実行する必要があります。

簡単にAPIを作ってみよう

環境構築後は実際に実装するだけです。
簡単なAPIを実装します。

処理フロー

このフレームワークAPIやそこまで大きくない規模のアプリケーション利用を想定して、
ADRを採用しています。

まずはAction、Routerです。

Action

Zend Expressiveのようにこのフレームワークにおいても、
アクションはあくまで一つのミドルウェアにすぎず、PSR-15(現在ドラフト)を採用しています。

これにならって、まずはActionを用意します。
src/Action/ReadAction.php として下記のものを記述します。

<?hh

namespace App\Action;

use App\Responder\IndexResponder;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\MiddlewareInterface;
use Psr\Http\Server\RequestHandlerInterface;
use Zend\Diactoros\Response\JsonResponse;

final class ReadAction implements MiddlewareInterface {

  public function process(
    ServerRequestInterface $request,
    RequestHandlerInterface $handler,
  ): ResponseInterface {
    return new JsonResponse([]);
  }
}

Hackのコードだけであれば、 <?hh // strict で厳格モードで実装ができますが、
PHPのライブラリが含まれる場合は、厳格モードにせずに実装します
PSR-7に準拠したライブラリで、HHVM/Hackで動作するものであればなにを利用しても構いません。
デフォルトでは zendframework/zend-diactoros になります。

このActionをRouterに登録します。

Router

デフォルトでは config/routes.global.php に記述するだけです
GETリクエストで作用するActionとして下記の通りに記述します。

<?hh

return [
  \Nazg\Foundation\Service::ROUTES => ImmMap {
    \Nazg\Http\HttpMethod::GET => ImmMap {
      '/' => ImmVector{ App\Action\IndexAction::class },
      '/sample' => ImmVector{ App\Action\ReadAction::class },
    },
  },
];

Register Container

利用準備はこれで整いますが、
このフレームワークでは簡単なDependency Injectionをサポートしており、
インスタンスの生成方法を指定する必要があります。
LaravelのようなAuto Wiringはないため、記述していないクラスは生成することができません。

デフォルトでは src/Module/ActionServiceModule.php が用意されていますので、
そちらに追記します。

<?hh // strict

namespace App\Module;

use App\Action\IndexAction;
use App\Action\ReadAction;
use App\Responder\IndexResponder;
use Ytake\HHContainer\Scope;
use Ytake\HHContainer\ServiceModule;
use Ytake\HHContainer\FactoryContainer;

final class ActionServiceModule extends ServiceModule {

  public function provide(FactoryContainer $container): void {
    $container->set(
      IndexAction::class,
      $container ==> new IndexAction(new IndexResponder()),
      Scope::PROTOTYPE,
    );
    // 追加したActionのインスタンス生成方法を記述
    $container->set(
      ReadAction::class,
      $container ==> new ReadAction(),
      Scope::PROTOTYPE,
    );
  }
}

Scopeは指定しない場合は都度インスタンスを生成するPrototypeになりますが、
Singletonを望む場合は、 Scope::SINGLETON を指定してください。

Containerの詳細な使い方については、
ytake/hh-container を参照ください。

クラス追加時に忘れずにdump-autoload

このフレームワークはHackに最適化されたcomposerプラグインのhhvm-autoloadを利用しています。

github.com

クラス追加時は、以下のコマンドを必ず実行して、hh_autoload.phpに反映してください。

hhvm -d xdebug.enable=0 -d hhvm.jit=0 -d hhvm.php7.all=1 -d hhvm.hack.lang.auto_typecheck=0 $(which composer) dump-autoload

実行後は追加した /sample にアクセスしてみてください。
空のJson配列が返却されているはずです。

Hackならではの機能を使ってみよう

HackにはShapeという配列に対しての型をチェックするものがあり、
配列に対しても厳格さを要求することができます。

APIを開発する際に、あるカラムにstringやintが混在し、
AndroidiOSの開発者に注意されることなどもあるのではないかと思いますが、
そう云うケースや、バリデーションに利用することができます。

ここではレスポンスに対して、期待通りのレスポンスを返しているか
チェックするミドルウェアを追加してみましょう

TypeAssert

hhvm/type-assert を使って厳格に調べるように実装し、
configファイルで特定のrouteにのみ作用するように記述します。

<?hh // strict

namespace App\Middleware;

use Facebook\TypeAssert;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\MiddlewareInterface;
use Psr\Http\Server\RequestHandlerInterface;

class TypeAssertMiddleware implements MiddlewareInterface {

  const type ReadStructure = shape('name' => string,);

  public function process(
    ServerRequestInterface $request,
    RequestHandlerInterface $handler,
  ): ResponseInterface {
    $response = $handler->handle($request);
    $decode = json_decode($response->getBody()->getContents(), true);
    TypeAssert\matches_type_structure(
      type_structure(self::class, 'ReadStructure'),
      $decode,
    );
    return $response;
  }
}

配列に期待する型をshapeで記述しています。
この例では、配列の中の name はstringであることとなります。

const type ReadStructure = shape('name' => string,);

先ほどの例と同様に、config/routes.global.phpや、
MiddlewareServiceModule.phpを作成し、アプリケーションに登録します。

MiddlewareServiceModule

デフォルトのActionServiceModule.phpと同様のクラスを作成します

<?hh // strict

namespace App\Module;

use App\Middleware\TypeAssertMiddleware;
use App\Responder\IndexResponder;
use Ytake\HHContainer\Scope;
use Ytake\HHContainer\ServiceModule;
use Ytake\HHContainer\FactoryContainer;

final class MiddlewareServiceModule extends ServiceModule {

  public function provide(FactoryContainer $container): void {
    $container->set(
      TypeAssertMiddleware::class,
      $container ==> new TypeAssertMiddleware(),
    );
  }
}

各configに追記します。

config/module.global.php

依存解決方法を記載したServiceModuleクラスを追加します

return [
  \Nazg\Foundation\Service::MODULES => [
    \App\Module\ActionServiceModule::class,
    \App\Module\MiddlewareServiceModule::class,    
  ],
];

config/routes.global.php

特定のrouteで作用するように、ImmVectorに追記します。
Actionクラスを挟み込むように作用させるには、Actionクラスよりも前に指定します。
Action, Middlewareはここで指定した順番で実行されます。

<?hh

return [
  \Nazg\Foundation\Service::ROUTES => ImmMap {
    \Nazg\Http\HttpMethod::GET => ImmMap {
      '/' => ImmVector{ 
        \App\Middleware\TypeAssertMiddleware::class, 
        \App\Action\IndexAction::class,
      },
      '/sample' => ImmVector{ App\Action\ReadAction::class },
    },
  },
];

これで /sample にアクセスすると 配列に期待している型とは異なっているため、
Facebook\TypeAssert\IncorrectTypeException がスローされます。

これを回避するため、src/Action/ReadAction.php で返却されるレスポンスを変更します。

<?hh // strict

// 省略
final class ReadAction implements MiddlewareInterface {

  public function process(
    ServerRequestInterface $request,
    RequestHandlerInterface $handler,
  ): ResponseInterface {
    return new JsonResponse(['name' => 'ytake']);
  }
}

これで /sample にアクセスすると期待通りの型になったことにより、
通常のjsonのレスポンスが返却されます。

今回は簡単なHackによるアプリケーション開発を紹介しました

New Year's Resolution 2018

2018年になりましたので、今年の抱負とか

アウトプット

登壇とか

引き続きPHP系のカンファレンスにはお邪魔しながら、
登壇なりをしていこうと思ってます。
去年はビッグデータ系のミドルウェアアーキテクチャが中心でした。

面白いテーマではありながらも、Hadoop周りのシステムや他言語もある程度理解していないと難しいものでした。

今年はやはり、Hackかなぁ・・・
shapeを使った構造体ライクなアプローチとかは結構面白いと思うので、
そのあたりを交えたり、Hack面白そうじゃん!というところをもっと共有していきたいですね

今年もbuildersconまたいきたい・なんか喋りたいなぁ

ライブラリ

去年からScala周りといっても主にSparkや Kafkaですが、
そのあたりのものを作ったり、提供したりしようと思っています。

Hackも折角なので簡単に使えるものをいくつか作ろうと思ってます
2017年末〜年明けは実はHackでADR的に作るなら、
ということですごい簡単な例を公開しながら開発中です。
まだ作っている途中なのと、どこかでマイクロフレームワークライクに切り出すかもしれません

github.com

書き物

がんばる・・・

インプット

知識がそのままで止まることがないように、引き続き意識してやっていこう
ただもうフロントエンド開発はほとんどやらないので、Reactで止める

健康

11月中旬から椎間板ヘルニアがどうにもならなくなって、保存療法で入院したりしてました。
悪化しないように気をつけよう

ヘルニアで休んでいた期間に感じたのは、子供の圧倒的な成長速度
毎日終電までオフィスにいた生活ではわからなかったくらい、日々変わって成長していく子供
全然触れ合わずに仕事だけするというのもあまりよくない事だなと実感
変えてこう

あとはずっとやりたかったことができそうになってきているので、
本気で取り組んでいく

ytake/gardening(VagrantBox) 更新のお知らせ

laravel/homesteadのCentOS7版ともいえるVagrantBoxを公開していますが、
実はちゃんと定期的に更新しています。

ytake.hateblo.jp

app.vagrantup.com

利用したい場合は次のコマンドで追加してください。

$ vagrant box add ytake/gardening

更新内容

今回は0.x系から 1.0にバージョンをあげ、
以前のBoxから大幅に変更を行いました。

複数のPHPバージョンを一つに

今まで単一のバージョンだけで提供していましたが、
自分が使うから、ということもあり複数のバージョンを一つのBoxで提供することにしました。

とはいっても特殊なことはしておらず、remi-safeでPHP7.0からPHP7.2までの最新バージョンを含んでいます。
BoxのデフォルトはPHP7.2にしていますが、
Composerで提供しているライブラリで自由に変更できます。

github.com

packagist.org

これにより、現versionのHomesteadと同じようにプロジェクト毎に異なるPHPバージョンで開発することができます。
当然すべてのPHPバージョンでエクステンションは統一しています。

バージョン指定方法

VagrantBoxを追加したあと、Composerインストールで上記のライブラリを追加し、

$ ./vendor/bin/gardening gardening:setup

を実行すると、VagrantBoxの環境を変更できる設定ファイルが出力されます(vagrant.yaml)
sitesやfoldersはプロジェクトの内容が自動で記述されますが、 例としては次のようになります。

ip: 192.168.10.10
memory: 4096
cpus: 1
hostname: gardening
name: ytake-web-develop
authorize: ~/.ssh/id_rsa.pub
keys:
    - ~/.ssh/id_rsa
folders:
    - map: ./
      to: /home/vagrant/component-web

sites:
    - map: gardening.app.vagrant
      to: /home/vagrant/component-web/public
      php: "7.2"

php-alternatives: "7.1"
elasticsearch: true
kibana: true
fluentd: false
mongodb: false
couchbase: true
cassandra: true
confluent: false
rabbitmq: true
timezone: Asia/Tokyo

yamlphp-alternatives はオプションのため、デフォルトでは内容に含みませんが、
cliなどで利用する場合のデフォルトのバージョンを変更します。 上記の例では、あるプロジェクトでは7.2で動作させますが、メインとしては7.1で動作させることになります これは alternatives でバージョンを切り替えているだけですので、
各バージョンできちんとコマンドを実行したい場合は、
以下の各バージョンのディレクトリを参照してください。

各バージョンのディレクトリ、ソケットについて

php version bin dir fpm socket
php7.0 /opt/remi/php70/root/usr/bin /var/run/php70-fpm.sock
php7.1 /opt/remi/php71/root/usr/bin /var/run/php71-fpm.sock
php7.2 /opt/remi/php72/root/usr/bin /var/run/php72-fpm.sock

PHPエクステンションリスト

面倒臭いsqlserver向けのものなども当然含んでますので、開発は楽チンです。
Redisまわりのエクステンションはphpiredisとredis両方が含まれています。

amqp
apc
apcu
ast
bcmath
bz2
calendar
cassandra
Core
couchbase
ctype
curl
date
dom
event
exif
fileinfo
filter
ftp
gd
gettext
gmp
hash
iconv
igbinary
imagick
intl
json
ldap
libsodium
libxml
mbstring
memcached
memprof
mongodb
msgpack
mysqli
mysqlnd
openssl
pcntl
pcre
pcs
PDO
pdo_dblib
pdo_mysql
pdo_pgsql
pdo_sqlite
pdo_sqlsrv
pgsql
phalcon
Phar
phpiredis
posix
rdkafka
readline
redis
Reflection
session
shmop
SimpleXML
soap
sockets
sodium
solr
SPL
sqlite3
sqlsrv
ssh2
standard
Stomp
sysvmsg
sysvsem
sysvshm
tokenizer
uopz
uuid
wddx
xdebug
xhprof
xml
xmlreader
xmlrpc
xmlwriter
xsl
yaml
Zend OPcache
zip
zlib
zmq

Xdebugのポート変更

上記の複数のPHPバージョン提供のため、それぞれのプロジェクトで利用できるようにポートを下記にしています。

php version xdebug.remote_port
php7.0 xdebug.remote_port = 9070
php7.1 xdebug.remote_port = 9071
php7.2 xdebug.remote_port = 9072

これ以外は次の通りです。

xdebug.remote_enable = 1
xdebug.remote_connect_back = 1
xdebug.max_nesting_level = 512
xdebug.idekey = PHPSTORM

あくまでデフォルトで用意している値ですので、お使いの環境に合わせて自由に変更してください。

Elasticsearchバージョンアップ+Kibana

Elasticsearchのバージョンを6.1.1に更新しました。
またKibanaをVagrantBoxに含めるようにしましたので、
optionalで有効にすると利用できます。

elasticsearch: true
kibana: true

Kibana利用は以下にアクセスしてください

http://VagrantBoxのipアドレス:5601/

RabbitMQ, Apache Kafkaの追加

LaravelではフレームワークのQueueを利用する方も多いと思いますが、
様々なアプリケーション・言語と接続するには効率がいいものではありません。
そういう場合はメッセージブローカーを使うのが一般的ですが、
今回RabbitMQとApache Kafkaを追加しました。

RabbitMQを利用したい場合

rabbitmq: true

RabbitMQのWeb GUIアクセス方法

rabbitmqを有効にすると、以下のアドレスでアクセスできます。
http://VagrantBoxのipアドレス:15672/

ユーザー名とパスワードは次の通りです。

user: gardening
password: 00:secreT,@

Apache Kafkaを利用したい場合

confluent: true

Kafka GUIアクセス

Apache KafkaにはRabbitMQのようなGUIは提供されていません。
ですがthird partyのものを含めています。
実行する場合は下記のコマンドか、supervisorがインストールされていますのでそちらをご利用ください。

$ nohup /home/vagrant/trifecta-ui/bin/trifecta-ui -Dhttp.port=9888 > /var/log/trifecta/out.log &

上記のコマンドを実行すると、http://VagrantBoxのipアドレス:9888/ でアクセスできます。
クラスタの管理には yahoo/kafka-manager などを利用するといいでしょう

github.com

Apache Cassandra, Apache Sparkの追加

前述のApache Kafkaと組み合わせることで、Streamingアプリケーション、
Kappa Architectureを実装することができます

Cassandraを利用したい場合

cassandra: true

cqlsh(コマンドラインツール)は以下のように入力して利用してください

$ cqlsh VagrantBoxのipアドレス

Sparkを利用したい場合

Sparkを実行する場合は、特になにかを有効にする必要はありません

spark-shellなどは次のように入力するだけです

$ spark-shell

Sparkのディレクトリは export SPARK_HOME=/opt/spark となっています。

Optinalの詳細

今回紹介したもの以外のオプショナルなミドルウェアは下記を参照ください

github.com

Windowsについて

Windowsについてはきちんとサポートしておりません
Windows PCを所有していないため、動作確認などもできていません
きちんと対応してもいいよという方がいましたら是非お願いします

Laravelで作る分析・分散処理アプリケーション その2

Kafka Consumer + Prestodb例

ytake.hateblo.jp

上記のエントリの続編です。

その1 ではApache Kafkaを組み合わせて、
データの分散やアプリケーション自体をスケールするアプローチを紹介しました。

今回は分散したサービスのデータの集約をKafkaとPrestoを組み合わせて、
ログ分析の基盤作りの例を紹介します。

アプリケーションのログをfluentd, elasticsearchで収集し、サービズ作りに活かすケースは多いと思います。

今回の例ではログにサービス固有の情報、物理的に異なるデータベースを集め、
ログデータをKPIなどに活かせる形にし、elasticseachに格納します。

あるところでデータ更新などが行われた場合でも、
Kafkaを軸にメッセージを受信することでelasticsearchのドキュメント更新なども簡単に行えます。

続きを読む

Laravelで作る分析・分散処理アプリケーション その1

先日のPHPカンファレンスPHPカンファレンス関西、buildersconでお話しした内容を元にして、
Laravel(PHP)を使って分析処理の簡単な実装や、
ミドルウェアを組み合わせた分散処理の実装を紹介します。

本ブログのサンプルアプリケーションは下記になりますので、
コードやミドルウェアなどを参照ください。

github.com

Laravelとkafka Connect、Elasticsearchの組み合わせ

Apache Kafkaを使ったスケーラブルなアプリケーションの入門編です。

レコード量が多い複雑なコンテンツのデータや検索要件、Like検索など、

RDBMSの不得意な分野などを対応することも多いかと思いますが、
RDBMSとElasticsearchを併用しKafkaで複雑さを吸収して、
アプリケーションをスケールさせるようにしてみましょう。

データベースのテーブル設計時に想定されるデータモデリングと、
サービスが成長することによってデータの複雑化と、検索の複雑さが増し、
ビジネス要件がより高度になっていきます。
これらを解消するために全文検索を導入するなどが考えられますが、
このサンプルではそういったデータストレージが異なる場合でも、
CQRS+ESライクに問題を解決するヒントになればと思います。

サンプルアプリケーションでは /fulltext 配下のurlが該当します。

Kafka Producerの実装

いわゆるMessage Queueのメッセージ送信を実装します。

LaravelのQueueを想像されるかもしれませんが、フレームワークのQueueではなく、
こういった処理は原則他の言語でも利用できるようにする必要がありますので、
フレームワークの知識をメッセージに混入させることなく実装するようにします。

Producerの実装自体は難しいものではありません。

<?php
declare(strict_types=1);

namespace App\Foundation\Producer;

use Psr\Log\LoggerInterface;
use RdKafka\Conf;
use RdKafka\Producer as KafkaProducer;
use RdKafka\Producer as RdkafkaProducer;
use RdKafka\ProducerTopic;

/**
 * Class Producer
 */
class Producer
{
    /** @var RdkafkaProducer */
    protected $producer;

    /** @var string */
    protected $topic = 'default';

    /** @var null|LoggerInterface  for optional logger */
    protected $logger;

    /** @var string */
    protected $brokers;

    /** @var array */
    protected $options;

    /**
     * Producer constructor.
     *
     * @param string $topic
     * @param string $brokers
     * @param array  $options
     */
    public function __construct(string $topic, string $brokers, array $options = [])
    {
        $this->topic = $topic;
        $this->brokers = $brokers;
        $this->options = $options;
    }

    /**
     * @param AbstractProduceDefinition $definition
     */
    public function produce(AbstractProduceDefinition $definition)
    {
        $kafkaTopic = $this->producerTopic();
        $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $definition->payload());
        if ($this->logger instanceof LoggerInterface) {
            $this->logger->info($definition->payload());
        }
        $this->producer->poll(0);
    }

    /**
     * @param LoggerInterface $logger
     */
    public function setLogger(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @return ProducerTopic
     */
    protected function producerTopic(): ProducerTopic
    {
        $this->producer = $this->producer();
        $this->producer->setLogLevel(LOG_DEBUG);
        $this->producer->addBrokers($this->brokers);

        return $this->producer->newTopic($this->topic);
    }

    /**
     * @return KafkaProducer
     */
    protected function producer(): KafkaProducer
    {
        $conf = new Conf();
        foreach ($this->options as $key => $item) {
            $conf->set($key, $item);
        }

        return new KafkaProducer($conf);
    }
}

Command 実装

ここで指すCommandとは、artisan commandのアプリケーションではなく、
CQRSのCommandとQuery、データの書き込みと読み込みを分離して実装します。

以下は登録処理のコントローラクラスです。

<?php
declare(strict_types=1);

namespace App\Http\Controllers\Fulltext;

use App\Events\SinkConnect;
use App\Http\Controllers\Controller;
use App\Http\Requests\FulltextRequest;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Http\RedirectResponse;
use Illuminate\Routing\Redirector;

/**
 * Class RegisterAction
 */
final class RegisterAction extends Controller
{
    /** @var Dispatcher */
    private $dispatcher;

    /** @var Redirector */
    private $redirector;

    /**
     * RegisterAction constructor.
     *
     * @param Dispatcher $dispatcher
     * @param Redirector $redirector
     */
    public function __construct(Dispatcher $dispatcher, Redirector $redirector)
    {
        $this->dispatcher = $dispatcher;
        $this->redirector = $redirector;
    }

    /**
     * @param FulltextRequest $request
     *
     * @return RedirectResponse
     */
    public function __invoke(FulltextRequest $request): RedirectResponse
    {
        // 登録処理後に実行されるevent
        $this->dispatcher->dispatch(
            new SinkConnect(strval($request->get('fulltext')))
        );

        return $this->redirector->route('fulltext.index');
    }
}

サンプルではデータ書き込み(RDBMS)は省略していますが、
上記のコードの __invoke メソッドに記述するだけです。

何か登録処理が行われたものとして、その後にEventを発動しています。
このコントローラクラスでは、フロントで送信された文章(ブログの記事など)を保存する、
という機能を提供していますが、Kafkaへの通知はアプリケーションの要件ではなく、
システム都合の処理になりますので、ここのクラスではなく、Event Handlerが処理を行う様になっています。

このEventクラスはシンプルなクラスです。

<?php
declare(strict_types=1);

namespace App\Events;

/**
 * Class SinkConnect
 */
final class SinkConnect
{
    /** @var string */
    private $note;

    /**
     * SinkConnect constructor.
     *
     * @param string $note
     */
    public function __construct(string $note)
    {
        $this->note = $note;
    }

    /**
     * @return string
     */
    public function note(): string
    {
        return $this->note;
    }
}

このEventを処理するHandlerクラスがKafkaのメッセージを送信します。

Event Handler

SinkConnectイベントに反応して処理を行うクラスを実装します。

Handlerクラスから最初に紹介したKafka Producerを利用できる様に次の様に実装しています。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use App\Foundation\Producer\Producer;
use App\Foundation\Producer\AbstractProduceDefinition;

/**
 * Class AbstractProduce
 */
abstract class AbstractProduce
{
    /** @var Producer */
    protected $producer;

    /**
     * MessageProduceUsecase constructor.
     *
     * @param Producer $producer
     */
    public function __construct(Producer $producer)
    {
        $this->producer = $producer;
    }

    /**
     * @param AbstractProduceDefinition $analyze
     */
    public function run(AbstractProduceDefinition $analyze)
    {
        $this->producer->produce($analyze);
    }
}

後述する分析処理にもKafkaを利用するためこのクラスを継承して利用します。

<?php
declare(strict_types=1);

namespace App\DataAccess;

/**
 * Class RegisterProduce
 */
final class RegisterProduce extends AbstractProduce
{

}

kafkaのtopicを処理によって切り替えるため、クラスを別クラスとして切り出しています。
このクラスを利用するHandlerクラスは以下のようになります。

<?php
declare(strict_types=1);

namespace App\Listeners;

use Ramsey\Uuid\Uuid;
use App\Events\SinkConnect;
use App\DataAccess\RegisterProduce;
use App\Definition\FulltextDefinition;

/**
 * Class SinkConnectHandler
 */
final class SinkConnectHandler
{
    /** @var RegisterProduce */
    protected $producer;

    /**
     * SinkConnectHandler constructor.
     *
     * @param RegisterProduce $producer
     */
    public function __construct(RegisterProduce $producer)
    {
        $this->producer = $producer;
    }

    /**
     * @param SinkConnect $connect
     */
    public function handle(SinkConnect $connect)
    {
        $this->producer->run(
            new FulltextDefinition(Uuid::uuid4()->toString(), $connect->note())
        );
    }
}

送信が可能な状態になりましたが、接続情報がないため、
これをServiceProviderを使って設定値を外から渡します。

設定値はconfig/kafka.phpに配置します

<?php

return [
    'topics'   => [
       'fulltext.register' => [
            'topic'   => 'fulltext.register',
            'brokers' => '127.0.0.1',
            'options' => [
                'socket.blocking.max.ms'       => '1',
                'queue.buffering.max.ms'       => '1',
                'queue.buffering.max.messages' => '1000',
                'client.id'                    => 'testingClient',
            ],
        ],
    ]
];

この設定値を使い、RegisterProduceクラスに与えます

<?php

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        $this->app->when(RegisterProduce::class)
            ->needs(Producer::class)
            ->give(function (Application $app) {
                $kafkaConfig = $app['config']->get('kafka');
                $topic = $kafkaConfig['topics']['fulltext.register'];
                $producer = new Producer($topic['topic'], $topic['brokers'], $topic['options']);
                $producer->setLogger($app['log']);

                return $producer;
            });
    }
}

これでEventが発動するとHandlerクラスが反応し、Kafkaへメッセージが送信されます。
格納されるKafkaのtopicは fulltext.register です。 ここで送信されるメッセージには、uuidと、フォームで入力された文字列となります。

Kafka Connectの設定

送信されたメッセージをElasticsearchに送信するためにKafka Connect Elasticsearchの設定を行います。

github.com

これはConfluentをインストールすると含まれますので、追加で入れる必要はありません。

サンプルアプリケーションにはConfluentやElasticsearchも含まれています

ここでは一台で動かすためStandaloneモードで起動させます。

connect-standalone.propertiesファイルを作成して、以下の内容を記述します。

bootstrap.servers=192.168.10.10:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

rest.port=8093

kafka connectを利用するにはconverterがいくつか種類があり、代表的なものはavroですが、
ここではjsonConverterを指定します。(avro利用例は公式を参照ください)

Elasticsearch Connector — Confluent Platform 3.3.0 documentation

このファイルを /etc/schema-registry/connect-standalone.properties として設置します。
*サンプルは設置済みです。

次にkafka connectで直接elasticsearchに接続して、インデックスにデータを追加する設定を記述します。
elasticsearch-connect.propertiesファイルを作成しkafkaのtopic情報などを記述します。

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector

tasks.max=1
topics=fulltext.register
key.ignore=true
connection.url=http://192.168.10.10:9200

schema.ignore=true
type.name=kafka-connect

上記の内容で、elasticsearchにKafkaのtopicと同じく、fulltext.register index が作成されます。
documentの_typeは kafka-connect となります。

これを /etc/kafka-connect-elasticsearch/elasticsearch-connect.properties として設置します。

次にKafkaに上記のKafka Connectを登録します。

sudo connect-standalone -daemon /etc/schema-registry/connect-standalone.properties /etc/kafka-connect-elasticsearch/elasticsearch-connect.properties
sudo confluent load elasticsearch-sink

daemonでKafka Connect Elasticsearchを起動し、 ConfluentにConnectorをLoadして登録します。

この状態で、Laravelで実装した処理を実行するとElasticsearchのindexに挿入されていきます。

Query実装

CQRSのQuery、データの読み込みを実装します。

LaravelのElasticsearchパッケージなどでも簡単に操作ができます。
サンプルではElasticsearchのphpクライアントライブラリを利用して実装しています。

<?php
declare(strict_types=1);

namespace App\DataAccess;

use Acme\Blog\Entity\EntryCriteria;
use App\Foundation\Elasticsearch\ElasticseachClient;

/**
 * Class FulltextIndex
 */
class FulltextIndex implements EntryCriteria
{
    /** @var ElasticseachClient */
    protected $client;

    /** @var string */
    protected $index = 'fulltext.register';

    /**
     * FulltextIndex constructor.
     *
     * @param ElasticseachClient $client
     */
    public function __construct(ElasticseachClient $client)
    {
        $this->client = $client;
    }

    /**
     * @return array
     */
    public function all(): array
    {
        $result = $this->client->client()->search([
            "index"  => $this->index,
            'type'   => 'kafka-connect',
            "body"   => [
                "query" => [
                    "match_all" => new \stdClass(),
                ],
            ],
        ]);
        $map = [];
        if (count($result)) {
            foreach($result['hits']['hits'] as $hit) {
                $map[] = $hit['_source'];
            }
        }

        return $map;
    }

    public function queryBy(string $string)
    {
        // TODO: Implement queryBy() method.
    }
}

Elasticsearchに問い合わせた結果は、
src/Entry (ドメイン)配下でRepositoryなどを経由してControllerを介してhtml出力されます。
詳細な実装はサンプルコードを参照ください

<?php
declare(strict_types=1);

namespace App\Http\Controllers\Fulltext;

use Acme\Blog\Specification\ActiveEntrySpecification;
use Acme\Blog\Usecase\RetrieveEntryUsecase;
use App\Http\Controllers\Controller;
use App\Http\Responders\HtmlResponder;
use Illuminate\Http\Response;

/**
 * Class IndexAction
 */
final class IndexAction extends Controller
{
    /** @var ActiveEntrySpecification */
    private $specification;

    /** @var RetrieveEntryUsecase */
    private $usecase;

    /**
     * IndexAction constructor.
     *
     * @param ActiveEntrySpecification $specification
     * @param RetrieveEntryUsecase     $usecase
     */
    public function __construct(
        ActiveEntrySpecification $specification,
        RetrieveEntryUsecase $usecase
    ) {
        $this->specification = $specification;
        $this->usecase = $usecase;
    }

    /**
     * @param HtmlResponder $responder
     *
     * @return Response
     */
    public function __invoke(HtmlResponder $responder): Response
    {
        $responder->template('fulltext.index');

        return $responder->emit([
            'list' => $this->usecase->run($this->specification),
        ]);
    }
}

アプローチ

このサンプルではKafka Connectを使ってデータの分散を行い、
RDBMSとElasticsearchの責務を分割した例を紹介しました。

小さなアプリケーションではオーバスペックな実装ですが、
確実なデータと検索に特化したミドルウェアを組み合わせて、データ上のパフォーマンスと堅実さを提供することができます。
また分散することで、どちらかに障害が発生した場合でもアプリケーションの動作を担保したり、
障害復旧などにも活かすことができるかと思います。

次回は物理的に分散したデータベースをPrestoで集約させて、Kafka Consumer経由でElasticsearchに格納する実装例を紹介します。

PHPカンファレンス2017でApache Kafkaについて話しました

PHPカンファレンス2017

今年も参加してきました

f:id:ytakezawa:20171010005624j:plain

2017/10/08 PHPカンファレンス2017で発表に使ったスライドです

speakerdeck.com

巨大化してしまったアプリケーションを分解する場合や、
マイクロサービス化するにあたって、
こうしたメッセージミドルウェアを利用するのがアプリケーション開発の重要なポイントになっています。

コンポーネント化やDDDなどによる堅実なアプリケーションも大事な要素ではありますが、
最近では複数のデータベースを跨いでるアプリケーションや、
ユーザー向けのサービスで分析処理、
ビッグデータなどと連携するアプリケーションでKafkaを利用するケースが多くなっている傾向もあります。

PHPは多くのサービスを支えている言語です。
今後は堅実さと、より巨大なデータフローを支えるシーンが多くなると思います。

そんな場面に直面した時のヒントになればと思います。

デモがみたい!と何人かの方に言われましたので、
折角なのでLaravelとKafka Connect、Prestoなどを使ったサンプルを公開します!

f:id:ytakezawa:20171010005756j:plain

弁当、めっちゃ美味しかった・・!
実は毎年スピーカー弁当あるよ!と数年スピーカーとして参加しているのに初めて知りました・・

今年の登壇シリーズ

毎年各地のPHPカンファレンスに参加していますが、
毎年、「今年登壇するときはこれでやる!」と決めていたりしていまして、
今年は今やっているビッグデータ系をPHP側からアプローチするシリーズでした。
(福岡のカンファレンスだけはADRの話をしましたが)

そんなこんなで実は各地でやったセッション、全て見聞きすると繋がるシリーズになっています。

PHPカンファレンス関西、builderscon2017

speakerdeck.com

www.youtube.com

参加したセッション

session03: 型を意識したPHPアプリケーション開発

buildersconの時に他のセッションに参加していて途中参加だったため、最初から参加しました
とても丁寧な解説と、ライブコーディングで親切なセッションでした。
フレームワークを利用して開発していると、手軽さだけを重視してしまい、
結果的に巨大な処理になってしまい堅実さを軽視してしまいがちです。
複雑さに立ち向かうための型を理解する良いセッションでした。

session35: ChatWorkとPHPと私

弊社では自分がKafkaとHBaseなどを組み合わせて、
データ処理系のアーキテクチャを設計・開発しているところですが、
DataWorks Summit SanJose 2017の発表や、Scala移行などの話題があり、
気になっていたので参加しました。

アーキテクチャの話はありませんでしたが、実際にやった話が聞けたのでよかったです。
PHP7かHHVMか、という話もあり、
HHVMはSymfony4からは対応しなくなるなどがありますので、
Hackでも利用できるようにいくつか作ろうかなーと思いました。

session37: PHPで理解するニューラルネットワークを使った機械学習

php-ai/php-mlではなくて、自身でライブラリを作って学習させてみた、というセッション。
buildersconで "Googleが開発したニューラルネット専用LSITensor Processing Unit」" を聞いていたので、
GPU大丈夫なんだろうかと思いながら、
実践で使う場合はさすがにPHPではやりませんが...

単純に面白いなぁと思ったセッションでした。

来年は他にもいくつかPHPの大きなイベントがあるようなので楽しみです!

PHPでビッグデータを操作しよう!Presto編 2

ytake.hateblo.jp

*上記の続き

異なるデータベース、NoSQLなどを結合できるということが理解できたと思います。

それではPHPのアプリケーションから実際に利用してみましょう。

PHP Prestodb Client

PHPのPrestoクライアントライブラリは、古いものがありますが、
自分の用途に合わなかった(最新のPHP向けにしたかったなど)為、
新たに作り、公開しました。

*新しく作ったもの

github.com

*以前からある古いもの

github.com

prestoはpdoのようにネイティブのコネクタはなく、
REST APIによる操作が可能です。
以前からあるもののスタイルではなく、javaのクライアントに近い実装にしました。

早速クライアントを利用して、前回のクエリを発行します。

Install

PHP7.0以上にのみ対応していますので、7.0以上の環境で実行してください。

installはcomposerを利用してください。
下記のコマンドを実行するだけでOKです。

$ composer require ytake/php-presto-client

Usage

prestoに問い合わせるクライアントのインスタンス生成は次の通りです。

<?php

$query = "SELECT _key, _value, test_id, test_name, created_at 
FROM my_tests.testing.tests AS myttt 
INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name 
WHERE myttt.test_name = 'presto'";

$client = new \Ytake\PrestoClient\StatementClient(
    new \Ytake\PrestoClient\ClientSession('http://127.0.0.1:8080/', 'my_tests'),
    $query
);

ClientSessionクラスに接続先と、第二引数にカタログ名を指定します。
カタログ名を指定すると、そのカタログ名を省略できるようになります。

ClientSessionクラスのインスタンスをStatementClientクラスに与えると、
Prestoに問い合わせる準備が整います。

一番簡単に利用できるのは、Ytake\PrestoClient\ResultsSession 経由で結果を取得する方法です。

<?php
$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach($result as $row) {
    foreach($row->yieldData() as $yieldRow) {
        if($yieldRow instanceof \Ytake\PrestoClient\FixData) {
            var_dump($yieldRow->offsetGet('_key'), $yieldRow['_key']);
        }
    }
}

yieldResultで様々なデータが返却されます。
Prestoはデータを取得するまでに複数回リクエストをおくる必要があり、
Queryの結果取得以外にも様々なデータを分割して返却する為です。

Prestoから返却されるQueryの結果は Ytake\PrestoClient\FixData で返却されますので、
これを取得します。
このクラスは ArrayAccess を実装していますので、
一般的な配列へのアクセスと同等にカラムの値を取得できます。

前回利用したクエリの結果は次のようになります。

class Ytake\PrestoClient\FixData#20 (5) {
  public $_key =>
  string(6) "presto"
  public $_value =>
  string(7) "awesome"
  public $test_id =>
  int(1)
  public $test_name =>
  string(6) "presto"
  public $created_at =>
  string(23) "2017-09-15 03:49:30.000"
}

接続にユーザーなどが必要であれば、ClientSessionクラスのメソッドが利用できます。
Prepared Statementは現在のバージョンではサポートされていない為、
利用できませんが(ユーザーに近いフロントから利用するものではない為)
サポートされていた以前のバージョンではPreparedStatementクラスがそのまま使えると思います。

利用方法のまとめ

簡単な利用方法をまとめると次のようになります。

<?php

require_once __DIR__ . '/vendor/autoload.php';

$query = "SELECT _key, _value, test_id, test_name, created_at 
FROM my_tests.testing.tests AS myttt 
INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name 
WHERE myttt.test_name = 'presto'";

$client = new \Ytake\PrestoClient\StatementClient(
    new \Ytake\PrestoClient\ClientSession('http://127.0.0.1:8080/', 'my_tests'),
    $query
);
$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
// yield results instead of returning them. Recommended.
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach($result as $row) {
    foreach($row->yieldData() as $yieldRow) {
        if($yieldRow instanceof \Ytake\PrestoClient\FixData) {
            var_dump($yieldRow->offsetGet('_key'), $yieldRow['_key']);
        }
    }
}

オブジェクト マッピング

PDOのFETCH_CLASSのように、任意のオブジェクトで変更することもできます。

PHP: PDOStatement::fetch - Manual

任意のオブジェクトで返却したい場合、カラムをクラスのプロパティとして作成します。
指定したカラム以外の値は返却されませんので、
以下のようにすると、プロパティに記述した2つのカラム以外は返却されません。
尚、プロパティはpublic, protected, privateのいずれでも値が挿入されますので、
アプリケーションの設計に合わせて利用してください。

<?php

class Testing
{
    /** @var string */
    private $_key;

    /** @var string */
    private $_value;

    /**
     * @return string
     */
    public function key()
    {
        return $this->_key;
    }

    /**
     * @return string
     */
    public function value(): string
    {
        return $this->_value;
    }
}

あとはyieldObjectメソッドにクラス名を指定して値を取得します。

<?php

$resultSession = new \Ytake\PrestoClient\ResultsSession($client);
$result = $resultSession->execute()->yieldResults();

/** @var \Ytake\PrestoClient\QueryResult $row */
foreach ($result as $row) {
    foreach ($row->yieldObject(Testing::class) as $item) {
        if ($item instanceof Testing) {
            var_dump($item);
        }
    }
}

これでPHPのアプリケーションからPrestoを操作することができるようになりました。
管理画面や、分析用途に活用してみてください。