PHP模拟supervisor的进程管理

推荐:《PHP视频教程》

前言

模拟supervisor进程管理DEMO(简易实现)

没错,是造轮子!目的在于学习!

截图:
模拟supervisor的进程管理

在图中自己实现了一个Copy子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。

实现

1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程

不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。

知识点

代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。

代码

由于代码过多,所以如果你对我的方案有更好的建议可以在github这里看。

主进程代码:Process.php

<?php
require_once __DIR__ . &#39;/Consumer.php&#39;;require_once __DIR__ . &#39;/StreamConnection.php&#39;;require_once __DIR__ . &#39;/Http.php&#39;;class Process{
    /** 
     * 待启动的消费者数组
     */
    protected $consumers = array();
    protected $childPids = array();

    const PPID_FILE = __DIR__ . &#39;/process&#39;;
    protected $serializerConsumer;

    public function __construct()
    {
        $this->consumers = $this->getConsumers();
    }

    // 这里是个DEMO,实际可以用读取配置文件的方式。
    public function getConsumers()
    {
        $consumer = new Consumer([
            &#39;program&#39; => &#39;test&#39;,
            &#39;command&#39; => &#39;/usr/bin/php test.php&#39;,
            &#39;directory&#39; => __DIR__,
            &#39;logfile&#39; => __DIR__ . &#39;/test.log&#39;,
            &#39;uniqid&#39; => uniqid(),
            &#39;auto_restart&#39; => false,
        ]);
        return [
            $consumer->uniqid => $consumer,
        ];
    }

    public function run()
    {
        if (empty($this->consumers)) {
            // consumer empty
            return;
        }
        if ($this->_notifyMaster()) {
            // master alive
            return;
        }

        $pid = pcntl_fork();
        if ($pid < 0) {
            exit;
        } elseif ($pid > 0) {
            exit;
        }
        if (!posix_setsid()) {
            exit;
        }

        $stream = new StreamConnection(&#39;tcp://0.0.0.0:7865&#39;);
        @cli_set_process_title(&#39;AMQP Master Process&#39;);
        // 将主进程ID写入文件
        file_put_contents(self::PPID_FILE, getmypid());
        // master进程继续
        while (true) {
            $this->init();
            pcntl_signal_dispatch();
            $this->waitpid();
            // 如果子进程被全部回收,则主进程退出
            // if (empty($this->childPids)) {
            //     $stream->close($stream->getSocket());
            //     break;
            // }
            $stream->accept(function ($uniqid, $action) {
                $this->handle($uniqid, $action);
                return $this->display();
            });
        }
    }

    protected function init()
    {
        foreach ($this->consumers as &$c) {
            switch ($c->state) {
                case Consumer::RUNNING:
                case Consumer::STOP:
                    break;
                case Consumer::NOMINAL:
                case Consumer::STARTING:
                    $this->fork($c);
                    break;
                case Consumer::STOPING:
                    if ($c->pid && posix_kill($c->pid, SIGTERM)) {
                        $this->reset($c, Consumer::STOP);
                    }
                    break;
                case Consumer::RESTART:
                    if (empty($c->pid)) {
                        $this->fork($c);
                        break;
                    }
                    if (posix_kill($c->pid, SIGTERM)) {
                        $this->reset($c, Consumer::STOP);
                        $this->fork($c);
                    }
                    break;
                default:
                    break;
            }
        }
    }

    protected function reset(Consumer $c, $state)
    {
        $c->pid = &#39;&#39;;
        $c->uptime = &#39;&#39;;
        $c->state = $state;
        $c->process = null;
    }

    protected function waitpid()
    {
        foreach ($this->childPids as $uniqid => $pid) {
            $result = pcntl_waitpid($pid, $status, WNOHANG);
            if ($result == $pid || $result == -1) {
                unset($this->childPids[$uniqid]);
                $c = &$this->consumers[$uniqid];
                $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;
                $this->reset($c, $state);
            }
        }
    }


    /**
     * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程
     */
    private function _notifyMaster()
    {
        $ppid = file_get_contents(self::PPID_FILE );
        $isAlive = $this->checkProcessAlive($ppid);
        if (!$isAlive) return false;
        return true;
    }

    public function checkProcessAlive($pid)
    {
        if (empty($pid)) return false;
        $pidinfo = `ps co pid {$pid} | xargs`;
        $pidinfo = trim($pidinfo);
        $pattern = "/.*?PID.*?(\d+).*?/";
        preg_match($pattern, $pidinfo, $matches);
        return empty($matches) ? false : ($matches[1] == $pid ? true : false);
    }

    /**
     * fork一个新的子进程
     */
    protected function fork(Consumer $c)
    {
        $descriptorspec = [2 => [&#39;file&#39;, $c->logfile, &#39;a&#39;],];
        $process = proc_open(&#39;exec &#39; . $c->command, $descriptorspec, $pipes, $c->directory);
        if ($process) {
            $ret = proc_get_status($process);
            if ($ret[&#39;running&#39;]) {
                $c->state = Consumer::RUNNING;
                $c->pid = $ret[&#39;pid&#39;];
                $c->process = $process;
                $c->uptime = date(&#39;m-d H:i&#39;);
                $this->childPids[$c->uniqid] = $ret[&#39;pid&#39;];
            } else {
                $c->state = Consumer::EXITED;
                proc_close($process);
            }
        } else {
            $c->state = Consumer::ERROR;
        }
        return $c;
    }

    public function display()
    {
        $location = &#39;http://127.0.0.1:7865&#39;;
        $basePath = Http::$basePath;
        $scriptName = isset($_SERVER[&#39;SCRIPT_NAME&#39;]) &&
            !empty($_SERVER[&#39;SCRIPT_NAME&#39;]) &&
            $_SERVER[&#39;SCRIPT_NAME&#39;] != &#39;/&#39; ? $_SERVER[&#39;SCRIPT_NAME&#39;] : &#39;/index.php&#39;;
        if ($scriptName == &#39;/index.html&#39;) {
            return Http::status_301($location);
        }

        $sourcePath = $basePath . $scriptName;
        if (!is_file($sourcePath)) {
            return Http::status_404();
        }

        ob_start();
        include $sourcePath;
        $response = ob_get_contents();
        ob_clean();

        return Http::status_200($response);
    }

    public function handle($uniqid, $action)
    {
        if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {
            return;
        }
        switch ($action) {
            case &#39;refresh&#39;:
                break;
            case &#39;restartall&#39;:
                $this->killall(true);
                break;
            case &#39;stopall&#39;:
                $this->killall();
                break;
            case &#39;stop&#39;:
                $c = &$this->consumers[$uniqid];
                if ($c->state != Consumer::RUNNING) break;
                $c->state = Consumer::STOPING;
                break;
            case &#39;start&#39;:
                $c = &$this->consumers[$uniqid];
                if ($c->state == Consumer::RUNNING) break;
                $c->state = Consumer::STARTING;
                break;
            case &#39;restart&#39;:
                $c = &$this->consumers[$uniqid];
                $c->state = Consumer::RESTART;
                break;
            case &#39;copy&#39;:
                $c = $this->consumers[$uniqid];
                $newC = clone $c;
                $newC->uniqid = uniqid(&#39;C&#39;);
                $newC->state = Consumer::NOMINAL;
                $newC->pid = &#39;&#39;;
                $this->consumers[$newC->uniqid] = $newC;
                break;
            default:
                break;
        }
    }

    protected function killall($restart = false)
    {
        foreach ($this->consumers as &$c) {
            $c->state = $restart ? Consumer::RESTART : Consumer::STOPING;
        }
    }}$cli = new Process();$cli->run();

Consumer消费者对象

<?php
require_once __DIR__ . &#39;/BaseObject.php&#39;;class Consumer extends BaseObject{
    /** 开启多少个消费者 */
    public $numprocs = 1;
    /** 当前配置的唯一标志 */
    public $program;
    /** 执行的命令 */
    public $command;
    /** 当前工作的目录 */
    public $directory;

    /** 通过 $qos $queueName $duplicate 生成的 $queue */
    public $queue;
    /** 程序执行日志记录 */
    public $logfile = &#39;&#39;;
    /** 消费进程的唯一ID */
    public $uniqid;
    /** 进程IDpid */
    public $pid;
    /** 进程状态 */
    public $state = self::NOMINAL;
    /** 自启动 */
    public $auto_restart = false;

    public $process;
    /** 启动时间 */
    public $uptime;

    const RUNNING = &#39;running&#39;;
    const STOP = &#39;stoped&#39;;
    const NOMINAL = &#39;nominal&#39;;
    const RESTART = &#39;restart&#39;;
    const STOPING = &#39;stoping&#39;;
    const STARTING = &#39;stating&#39;;
    const ERROR = &#39;error&#39;;
    const BLOCKED = &#39;blocked&#39;;
    const EXITED = &#39;exited&#39;;
    const FATEL = &#39;fatel&#39;;}

stream相关代码:StreamConnection.php

<?php
class StreamConnection{
    protected $socket;
    protected $timeout = 2; //s
    protected $client;

    public function __construct($host)
    {
        $this->socket = $this->connect($host);
    }

    public function connect($host)
    {
        $socket = stream_socket_server($host, $errno, $errstr);
        if (!$socket) {
            exit(&#39;stream error&#39;);
        }
        stream_set_timeout($socket, $this->timeout);
        stream_set_chunk_size($socket, 1024);
        stream_set_blocking($socket, false);
        $this->client = [$socket];
        return $socket;
    }

    public function accept(Closure $callback)
    {
        $read = $this->client;
        if (stream_select($read, $write, $except, 1) < 1) return;
        if (in_array($this->socket, $read)) {
            $cs = stream_socket_accept($this->socket);
            $this->client[] = $cs;
        }
        foreach ($read as $s) {
            if ($s == $this->socket) continue;
            $header = fread($s, 1024);
            if (empty($header)) {
                $index = array_search($s, $this->client);
                if ($index)
                    unset($this->client[$index]);
                $this->close($s);
                continue;
            }
            Http::parse_http($header);
            $uniqid = isset($_GET[&#39;uniqid&#39;]) ? $_GET[&#39;uniqid&#39;] : &#39;&#39;;
            $action = isset($_GET[&#39;action&#39;]) ? $_GET[&#39;action&#39;] : &#39;&#39;;
            $response = $callback($uniqid, $action);
            $this->write($s, $response);
            $index = array_search($s, $this->client);
            if ($index)
                unset($this->client[$index]);
            $this->close($s);
        }
    }

    public function write($socket, $response)
    {
        $ret = fwrite($socket, $response, strlen($response));
    }

    public function close($socket)
    {
        $flag = fclose($socket);
    }

    public function getSocket()
    {
        return $this->socket;
    }}

Http响应代码:Http.php

<?php
class Http{

    public static $basePath = __DIR__ . &#39;/views&#39;;
    public static $max_age = 120; //秒

    /*
    *  函数:     parse_http
    *  描述:     解析http协议
    */
    public static function parse_http($http)
    {
        // 初始化
        $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =  array();
        $GLOBALS[&#39;HTTP_RAW_POST_DATA&#39;] = &#39;&#39;;
        // 需要设置的变量名
        $_SERVER = array(
            &#39;QUERY_STRING&#39; => &#39;&#39;,
            &#39;REQUEST_METHOD&#39; => &#39;&#39;,
            &#39;REQUEST_URI&#39; => &#39;&#39;,
            &#39;SERVER_PROTOCOL&#39; => &#39;&#39;,
            &#39;SERVER_SOFTWARE&#39; => &#39;&#39;,
            &#39;SERVER_NAME&#39; => &#39;&#39;,
            &#39;HTTP_HOST&#39; => &#39;&#39;,
            &#39;HTTP_USER_AGENT&#39; => &#39;&#39;,
            &#39;HTTP_ACCEPT&#39; => &#39;&#39;,
            &#39;HTTP_ACCEPT_LANGUAGE&#39; => &#39;&#39;,
            &#39;HTTP_ACCEPT_ENCODING&#39; => &#39;&#39;,
            &#39;HTTP_COOKIE&#39; => &#39;&#39;,
            &#39;HTTP_CONNECTION&#39; => &#39;&#39;,
            &#39;REMOTE_ADDR&#39; => &#39;&#39;,
            &#39;REMOTE_PORT&#39; => &#39;0&#39;,
            &#39;SCRIPT_NAME&#39; => &#39;&#39;,
            &#39;HTTP_REFERER&#39; => &#39;&#39;,
            &#39;CONTENT_TYPE&#39; => &#39;&#39;,
            &#39;HTTP_IF_NONE_MATCH&#39; => &#39;&#39;,
        );

        // 将header分割成数组
        list($http_header, $http_body) = explode("\r\n\r\n", $http, 2);
        $header_data = explode("\r\n", $http_header);

        list($_SERVER[&#39;REQUEST_METHOD&#39;], $_SERVER[&#39;REQUEST_URI&#39;], $_SERVER[&#39;SERVER_PROTOCOL&#39;]) = explode(&#39; &#39;, $header_data[0]);

        unset($header_data[0]);
        foreach ($header_data as $content) {
            // \r\n\r\n
            if (empty($content)) {
                continue;
            }
            list($key, $value) = explode(&#39;:&#39;, $content, 2);
            $key = strtolower($key);
            $value = trim($value);
            switch ($key) {
                case &#39;host&#39;:
                    $_SERVER[&#39;HTTP_HOST&#39;] = $value;
                    $tmp = explode(&#39;:&#39;, $value);
                    $_SERVER[&#39;SERVER_NAME&#39;] = $tmp[0];
                    if (isset($tmp[1])) {
                        $_SERVER[&#39;SERVER_PORT&#39;] = $tmp[1];
                    }
                    break;
                case &#39;cookie&#39;:
                    $_SERVER[&#39;HTTP_COOKIE&#39;] = $value;
                    parse_str(str_replace(&#39;; &#39;, &#39;&&#39;, $_SERVER[&#39;HTTP_COOKIE&#39;]), $_COOKIE);
                    break;
                case &#39;user-agent&#39;:
                    $_SERVER[&#39;HTTP_USER_AGENT&#39;] = $value;
                    break;
                case &#39;accept&#39;:
                    $_SERVER[&#39;HTTP_ACCEPT&#39;] = $value;
                    break;
                case &#39;accept-language&#39;:
                    $_SERVER[&#39;HTTP_ACCEPT_LANGUAGE&#39;] = $value;
                    break;
                case &#39;accept-encoding&#39;:
                    $_SERVER[&#39;HTTP_ACCEPT_ENCODING&#39;] = $value;
                    break;
                case &#39;connection&#39;:
                    $_SERVER[&#39;HTTP_CONNECTION&#39;] = $value;
                    break;
                case &#39;referer&#39;:
                    $_SERVER[&#39;HTTP_REFERER&#39;] = $value;
                    break;
                case &#39;if-modified-since&#39;:
                    $_SERVER[&#39;HTTP_IF_MODIFIED_SINCE&#39;] = $value;
                    break;
                case &#39;if-none-match&#39;:
                    $_SERVER[&#39;HTTP_IF_NONE_MATCH&#39;] = $value;
                    break;
                case &#39;content-type&#39;:
                    if (!preg_match(&#39;/boundary="?(\S+)"?/&#39;, $value, $match)) {
                        $_SERVER[&#39;CONTENT_TYPE&#39;] = $value;
                    } else {
                        $_SERVER[&#39;CONTENT_TYPE&#39;] = &#39;multipart/form-data&#39;;
                        $http_post_boundary = &#39;--&#39; . $match[1];
                    }
                    break;
            }
        }

        // script_name
        $_SERVER[&#39;SCRIPT_NAME&#39;] = parse_url($_SERVER[&#39;REQUEST_URI&#39;], PHP_URL_PATH);

        // QUERY_STRING
        $_SERVER[&#39;QUERY_STRING&#39;] = parse_url($_SERVER[&#39;REQUEST_URI&#39;], PHP_URL_QUERY);
        if ($_SERVER[&#39;QUERY_STRING&#39;]) {
            // $GET
            parse_str($_SERVER[&#39;QUERY_STRING&#39;], $_GET);
        } else {
            $_SERVER[&#39;QUERY_STRING&#39;] = &#39;&#39;;
        }

        // REQUEST
        $_REQUEST = array_merge($_GET, $_POST);

        return array(&#39;get&#39; => $_GET, &#39;post&#39; => $_POST, &#39;cookie&#39; => $_COOKIE, &#39;server&#39; => $_SERVER, &#39;files&#39; => $_FILES);
    }

    public static function status_404()
    {
        return <<<EOFHTTP/1.1 404 OK
content-type: text/htmlEOF;
    }

    public static function status_301($location)
    {
        return <<<EOFHTTP/1.1 301 Moved Permanently
Content-Length: 0
Content-Type: text/plain
Location: $locationCache-Control: no-cacheEOF;
    }

    public static function status_304()
    {
        return <<<EOFHTTP/1.1 304 Not Modified
Content-Length: 0EOF;
    }

    public static function status_200($response)
    {
        $contentType = $_SERVER[&#39;CONTENT_TYPE&#39;];
        $length = strlen($response);
        $header = &#39;&#39;;
        if ($contentType)
            $header = &#39;Cache-Control: max-age=180&#39;;
        return <<<EOFHTTP/1.1 200 OK
Content-Type: $contentTypeContent-Length: $length$header$responseEOF;
    }}

待执行的脚本:test.php

<?php
while(true) {
    file_put_contents(__DIR__  .  &#39;/test.log&#39;, date(&#39;Y-m-d H:i:s&#39;));
    sleep(1);}

在当前目录下的视图页面:
|- Process.php
|- Http.php
|- StreamConnection.php
|- Consumer.php
|- BaseObject.php
|- views/

更多编程相关知识,请访问:编程教学!!

以上就是PHP模拟supervisor的进程管理的详细内容,更多请关注其它相关文章!