Socket.ioで通知を実装する

今回リアルタイムでWebブラウザに通知を送る簡単なサンプルを書いてみました。Socket.ioはチャットのサンプル(特にブロードキャストの)は腐るほどあるように思うのですが、ユーザー個別の通信だったり通知の実装は案外サンプルを見かけなかったように思います。

サンプルの内容

内容としては、ユーザーIDを指定してメッセージ(通知)を送ると該当するユーザーのブラウザのみでそのメッセージを受け取る事ができるという簡単なものになります。server.jsに主な処理が書かれています。いちおうスケールアウトした場合の事を考えてRedisを利用するようになっています(Socket.ioはスケールアウトする場合はRedisをsocket.io-redisやsocoket.io-emitter等のRedisをバックエンドで利用するライブラリを利用します)。

ちなみに今回Socket.ioを素の状態で使ったのは初めてでした(今までSails.jsだったりLaravelのブロードキャストイベントだったり組み合わせ利用のみ)。思ったのはスケールアウトする場合(Redis使う場合)としない場合でけっこうコードが変わりますね。主に違いが出るのはRedis設定系は当たり前ですが、ユーザーIDとSocketのIDの紐付けを分散環境で管理する必要が出てきたのでそこが少し手間になりました(サンプルだとredis-helper.jsの中でまとめています)。

スケールアウトを考慮する場合

サンプルで実装しているのはこちらの状態になります。

import os from 'os';
import Koa from 'koa';
import serve from 'koa-static';
import RedisHelper from './redis-helper';

const REDIS_HOST = 'localhost';
const REDIS_PORT = 6379;

const app = new Koa();
const router = require('koa-router')();

app.use(serve('./public'));
app.use(router.routes());

/**
 * Server
 */
const server = app.listen(process.argv[2]);
console.log(`Listening ${process.argv[2]}...`);
const io = require('socket.io').listen(server);

/**
 * Redis
 */
const emitter = require('socket.io-emitter')({ host: REDIS_HOST, port: REDIS_PORT });
const redis = require('redis').createClient;
const pub = redis(REDIS_PORT, REDIS_HOST);
const sub = redis(REDIS_PORT, REDIS_HOST);
const redisAdapter = require('socket.io-redis');

io.adapter(redisAdapter({ host: REDIS_HOST, port: REDIS_PORT, pubClient: pub, subClient: sub }));
io.use(middlewareAttachUserId);

const helper = new RedisHelper(redis(REDIS_PORT, REDIS_HOST), os.hostname(), process.argv[2]);
helper.init();

/**
 * Events
 */
io.on('connection', (socket) => {
  console.log('connection', socket.id, socket.userId);
  helper.set(socket.userId, socket.id);
  socket.on('disconnect', () => {
    console.log('disconnect', socket.id, socket.userId);
    helper.del(socket.userId, socket.id);
  });
});

/**
 * Routing
 */
router.get('/message/to/:id/:message', async (ctx, next) => {
  await helper.fetchSocketIds(ctx.params.id)
    .then(socketIds => {
      console.log('socketIds', socketIds);
      ctx.body = `<p>Sent message to socket ids [${socketIds.join(', ')}]</p>`;
      socketIds.forEach((socketId) => {
        emitter.to(socketId).emit('push_message', `${ctx.params.message} - ${new Date()}`);
      });
    })
    .catch(err => {
      console.log(err);
      ctx.body = err;
    });
});

function middlewareAttachUserId (socket, next) {
  // TODO: Using token instead of real userId in this sample.
  socket.userId = socket.handshake.query.token;
  next(); // MUST call next() in middleware
}

下記の箇所で指定されたユーザーIDからSocketのIDを特定して送信を行っています。

  helper.fetchSocketIds(ctx.params.id)
    .then(socketIds => {
      console.log('socketIds', socketIds);
      ctx.body = `<p>Sent message to socket ids [${socketIds.join(', ')}]</p>`;
      socketIds.forEach((socketId) => {
        emitter.to(socketId).emit('push_message', `${ctx.params.message} - ${new Date()}`);
      });
    })
    .catch(err => {
      console.log(err);
      ctx.body = err;
    });

emitter.to(socketId).emit('push_message',${ctx.params.message} – ${new Date()});という箇所が実際の送信箇所です。

スケールアウトを考慮しない場合

スケールアウトを考慮しない場合だとかなりコードが簡略化されます。

import Koa from 'koa';
import serve from 'koa-static';
import _ from 'lodash';

const app = new Koa();
const router = require('koa-router')();

app.use(serve('./public'));
app.use(router.routes());

const server = app.listen(3000);
console.log('Listening 3000...');
const io = require('socket.io').listen(server);

io.on('connection', (socket) => {
  console.log('connection', socket.id);
});
io.use(middlewareAttachUserId);

router.get('/message/to/:id/:message', async (ctx, next) => {
  ctx.body = 'Sent a message to user '+ ctx.params.id;
  const sockets = _.toArray(io.sockets.connected);
  sockets.filter((socket) => {
    return +socket.userId === +ctx.params.id;
  }).forEach((socket) => {
    io.to(socket.id).emit('push_message', `${ctx.params.message} - ${new Date()}`);
  });
});

function middlewareAttachUserId (socket, next) {
  // TODO: Using token instead of real userId in this sample.
  socket.userId = socket.handshake.query.token;
  next(); // MUST call next() in middleware
}

こちらの場合はsocket.io-emitterを使う必要がないので、通常のioからemitする形になっています。

  sockets.filter((socket) => {
    return +socket.userId === +ctx.params.id;
  }).forEach((socket) => {
    io.to(socket.id).emit('push_message', `${ctx.params.message} - ${new Date()}`);
  });