以编程方式读取来自 systemd 日志的消息


Programmatically reading messages from systemd's journal

更新, 2013-09-12:

我已经深入研究了systemd,它journal,而且,我偶然发现了这个,它指出:

systemd-journald会将所有收到的日志消息转发到AF_UNIX SOCK_DGRAM套接字/run/systemd/journal/syslog(如果存在),Unix syslog 守护进程可以使用该套接字来进一步处理数据。

根据手册页,我确实将我的环境设置为在下面也有系统日志,我已经相应地调整了我的代码:

define('NL', "'n'r");
$log = function ()
{
    if (func_num_args() >= 1)
    {
        $message = call_user_func_array('sprintf', func_get_args());
        echo '[' . date('r') . '] ' . $message . NL; 
    }
};
$syslog = '/var/run/systemd/journal/syslog';
$sock = socket_create(AF_UNIX, SOCK_DGRAM, 0);
$connection = socket_connect($sock, $syslog);
if (!$connection)
{
    $log('Couldn''t connect to ' . $syslog);
}
else
{
    $log('Connected to ' . $syslog);
    $readables = array($sock);
    socket_set_nonblock($sock);
    while (true)
    {
        $read = $readables;
        $write = $readables;
        $except = $readables;
        $select = socket_select($read, $write, $except, 0);
        $log('Changes: %d.', $select);
        $log('-------');
        $log('Read: %d.', count($read));
        $log('Write: %d.', count($write));
        $log('Except: %d.', count($except));
        if ($select > 0)
        {
            if ($read)
            {
                foreach ($read as $readable)
                {
                    $data = socket_read($readable, 4096, PHP_BINARY_READ);
                    if ($data === false)
                    {
                        $log(socket_last_error() . ': ' . socket_strerror(socket_last_error()));
                    }
                    else if (!empty($data))
                    {
                        $log($data);
                    }
                    else
                    {
                        $log('Read empty.');
                    }
                }
            }
            if ($write)
            {
                foreach ($write as $writable)
                {
                    $data = socket_read($writable, 4096, PHP_BINARY_READ);
                    if ($data === false)
                    {
                        $log(socket_last_error() . ': ' . socket_strerror(socket_last_error()));
                    }
                    else if (!empty($data))
                    {
                        $log($data);
                    }
                    else
                    {
                        $log('Write empty.');
                    }
                }
            }
        }
    }
}

显然,这只看到(选择)write套接字上的更改。好吧,可能是这里有什么问题,所以我试图从他们那里读到,没有运气(也不应该有):


[Thu, 12 Sep 2013 14:45:15 +0300] Changes: 1.
[Thu, 12 Sep 2013 14:45:15 +0300] -------
[Thu, 12 Sep 2013 14:45:15 +0300] Read: 0.
[Thu, 12 Sep 2013 14:45:15 +0300] Write: 1.
[Thu, 12 Sep 2013 14:45:15 +0300] Except: 0. [Thu, 12 Sep 2013 14:45:15 +0300] 11: Resource temporarily unavailable

现在,这让我有点发疯。 syslog文档说这应该是可能的。代码有什么问题?

源语言:

我有一个工作原型,简单地说:

while(true)
{
    exec('journalctl -r -n 1 | more', $result, $exit);
    // do stuff
}

但这感觉不对,并且消耗了太多的系统资源,然后我发现日志有套接字。

我试图连接并从中读取:

AF_UNIX, SOCK_DGRAM : /var/run/systemd/journal/socket
AF_UNIX, SOCK_STREAM : /var/run/systemd/journal/stdout

给定的套接字。

使用 /var/run/systemd/journal/socket 时,socket_select看到 0 个变化。有了/var/run/systemd/journal/stdout我总是(每个循环)得到 1 个变化,0 字节数据。

这是我的"读者":

<?php
define('NL', "'n'r");
$journal = '/var/run/systemd/journal/socket';
$jSTDOUT = '/var/run/systemd/journal/stdout';
$journal = $jSTDOUT;
$sock = socket_create(AF_UNIX, SOCK_STREAM, 0);
$connection = @socket_connect($sock, $journal);
$log = function ($message)
{
    echo '[' . date('r') . '] ' . $message . NL; 
};
if (!$connection)
{
    $log('Couldn''t connect to ' . $journal);
}
else
{
    $log('Connected to ' . $journal);
    $readables = array($sock);
    while (true)
    {
        $read = $readables;
        if (socket_select($read, $write = NULL, $except = NULL, 0) < 1)
        {
            continue;
        }
        foreach ($read as $read_socket)
        {
            $data = @socket_read($read_socket, 1024, PHP_BINARY_READ);
            if ($data === false)
            {
                $log('Couldn''t read.');
                socket_shutdown($read_socket, 2);
                socket_close($read_socket);
                $log('Server terminated.');
                break 2;
            }
            $data = trim($data);
            if (!empty($data))
            {
                $log($data);
            }
        }
    }
    $log('Exiting.');
}

读取套接字中没有数据,我认为我做错了什么。

问题,想法:

我的目标是阅读消息,并在其中一些消息上执行回调

谁能指出我如何以编程方式阅读日记消息的正确方向?

/run/systemd/journal/ 下的套接字不适用于此 - …/socket…/stdout实际上是只写的(即用于将数据馈送到日志),而…/syslog套接字不应该被真正的系统日志以外的任何东西使用,更不用说 journald 不会通过它发送任何元数据。(事实上,默认情况下…/syslog套接字甚至不存在 - syslogd 必须实际侦听它,并且 journald 连接到它。

官方方法是直接从日志文件中读取,并使用 inotify 来监视更改(这与journalctl --follow相同,甚至tail -f /var/log/syslog用于代替轮询)。在C程序中,你可以使用libsystemd-journal中的函数,它将为你做必要的解析甚至过滤。

在其他语言中,您有三种选择:调用 C 库;自己解析日志文件(格式已记录);或分叉journalctl --follow可以告诉它输出 JSON 格式的条目(或更详细的日志导出格式)。第三个选项实际上效果很好,因为它只为整个流分叉一个进程;我已经为它编写了一个PHP包装器(见下文)。

最近的systemd版本(v193)也附带systemd-journal-gatewayd,它本质上是基于HTTP的journalctl版本;也就是说,你可以在http://localhost:19531/entries获得一个JSON或日志导出流。(网关journalctl 甚至都支持服务器发送的事件,以便从 HTML 5 网页访问流。但是,由于明显的安全问题,默认情况下禁用网关


附件:用于journalctl --follow的PHP包装器

<?php
/* © 2013 Mantas Mikulėnas <grawity@gmail.com>
 * Released under the MIT Expat License <https://opensource.org/licenses/MIT>
 */
/* Iterator extends Traversable {
    void    rewind()
    boolean valid()
    void    next()
    mixed   current()
    scalar  key()
}
calls:  rewind, valid==true, current, key
    next, valid==true, current, key
    next, valid==false
*/
class Journal implements Iterator {
    private $filter;
    private $startpos;
    private $proc;
    private $stdout;
    private $entry;
    static function _join_argv($argv) {
        return implode(" ",
            array_map(function($a) {
                return strlen($a) ? escapeshellarg($a) : "''";
            }, $argv));
    }
    function __construct($filter=[], $cursor=null) {
        $this->filter = $filter;
        $this->startpos = $cursor;
    }
    function _close_journal() {
        if ($this->stdout) {
            fclose($this->stdout);
            $this->stdout = null;
        }
        if ($this->proc) {
            proc_close($this->proc);
            $this->proc = null;
        }
        $this->entry = null;
    }
    function _open_journal($filter=[], $cursor=null) {
        if ($this->proc)
            $this->_close_journal();
        $this->filter = $filter;
        $this->startpos = $cursor;
        $cmd = ["journalctl", "-f", "-o", "json"];
        if ($cursor) {
            $cmd[] = "-c";
            $cmd[] = $cursor;
        }
        $cmd = array_merge($cmd, $filter);
        $cmd = self::_join_argv($cmd);
        $fdspec = [
            0 => ["file", "/dev/null", "r"],
            1 => ["pipe", "w"],
            2 => ["file", "/dev/null", "w"],
        ];
        $this->proc = proc_open($cmd, $fdspec, $fds);
        if (!$this->proc)
            return false;
        $this->stdout = $fds[1];
    }
    function seek($cursor) {
        $this->_open_journal($this->filter, $cursor);
    }
    function rewind() {
        $this->seek($this->startpos);
    }
    function next() {
        $line = fgets($this->stdout);
        if ($line === false)
            $this->entry = false;
        else
            $this->entry = json_decode($line);
    }
    function valid() {
        return ($this->entry !== false);
        /* null is valid, it just means next() hasn't been called yet */
    }
    function current() {
        if (!$this->entry)
            $this->next();
        return $this->entry;
    }
    function key() {
        if (!$this->entry)
            $this->next();
        return $this->entry->__CURSOR;
    }
}
$a = new Journal();
foreach ($a as $cursor => $item) {
    echo "================'n";
    var_dump($cursor);
    //print_r($item);
    if ($item)
        var_dump($item->MESSAGE);
}