<?php

/**
 * @file
 *
 * @todo Support notifications from publishers over HTTP.
 */

include_once 'PuSHHub.inc';
define('PUSH_HUB_NOTIFICATIONS_QUEUE', 'push_hub_notifications');

/**
 * Implements hook_enable().
 */
function push_hub_enable() {
  variable_set('queue_module_' . PUSH_HUB_NOTIFICATIONS_QUEUE, 'PuSHHub');
}

/**
 * Implements hook_menu().
 */
function push_hub_menu() {
  $items = array();
  $items['pubsubhubbub/endpoint'] = array(
    'page callback' => 'push_hub_endpoint',
    'page arguments' => array(3, 4),
    'access callback' => 'user_access',
    'access arguments' => array('access content'), // @todo Specific permission
    'file' => 'push_hub.pages.inc',
    'type' => MENU_CALLBACK,
  );
  return $items;
}

/**
 * Implements hook_cron().
 */
function push_hub_cron() {
  push_hub()->expire();
}

/**
 * Implements hook_cron_queue_info().
 */
function push_hub_cron_queue_info() {
  $queues = array();
  $queues[PUSH_HUB_NOTIFICATIONS_QUEUE] = array(
    'worker callback' => 'push_hub_notify_subscriber',
    'time' => 15,
  );
  return $queues;
}

/**
 * Implements hook_push_hub_drupal_queue_load_classes().
 */
function push_hub_drupal_queue_load_classes() {
  module_load_include('inc', 'push_hub', 'PuSHHubQueue');
}

/**
 * Notify subscribers.
 *
 * @param $topic
 *   URL of the topic that changed.
 * @param $changed
 *   The full or partial feed that contains the changed elements.  If NULL,
 *   a light ping (without any content) will be issued to subscriber and
 *   subscriber is expected to fetch content from publisher. Light pings are
 *   not pubsubhubbub spec conform.
 * @param $batch
 *   Set to true to use browser based batching, otherwise notifications will be
 *   queued for execution cron.
 *
 * @todo Support batching, queue failing notifications for later.
 */
function push_hub_notify($topic, $changed = NULL, $batch = FALSE) {
  if ($batch) {
    $operations = array();
    watchdog('push_hub', "notifying all subscribers batched", NULL, WATCHDOG_DEBUG);
    foreach (push_hub()->allSubscribers($topic) as $subscriber) {
      watchdog('push_hub', "notifying subscriber batched " . $subscriber, NULL, WATCHDOG_DEBUG);
      $operations[] = array(
        'push_hub_notify_subscriber_batchapi',
        array(array(
            'topic' => $topic,
            'subscriber' => $subscriber,
            'changed' => $changed,
          )),
      );
    }
    $batch = array(
      'title' => t('Notifying subscribers'),
      'operations' => $operations,
    );
    batch_set($batch);
  }
  else {
    watchdog('push_hub', "notifying all subscribers queued", NULL, WATCHDOG_DEBUG);
    foreach (push_hub()->allSubscribers($topic) as $subscriber) {
      watchdog('push_hub', "notifying subscriber queued " . $subscriber, NULL, WATCHDOG_DEBUG);
      push_hub_queue()->createItem(array('topic' => $topic, 'subscriber' => $subscriber, 'changed' => $changed));
    }
  }
}

/**
 * Batch API callback for push_hub_notify_subscriber().
 */
function push_hub_notify_subscriber_batchapi($info, &$context) {
  push_hub_notify_subscriber($info);
  $context['finished'] = TRUE;
}

/**
 * Notify subscriber, used as queue callback. The absence of a topic may
 * be confusing: A subscriber URL is unique to a topic URL, therefore it is not
 * necessary to supply subscriber and topic for issueing a notification.
 *
 * @param $info
 *   An array with a key 'subscriber' containing the subscriber callback URL and
 *   an optional key 'changed' containing the changed content.
 */
function push_hub_notify_subscriber($info) {
  if (!push_hub()->notify($info['topic'], $info['subscriber'], isset($info['changed']) ? $info['changed'] : NULL)) {
    // On failure, queue for later notification.
    push_hub_queue()->createItem($info, REQUEST_TIME + 5 * 60);
  }
}

/**
 * Instantiates a Drupal queue for push_hub.
 */
function push_hub_queue() {
  return DrupalQueue::get(PUSH_HUB_NOTIFICATIONS_QUEUE);
}

/**
 * Instantiates a PuSHHub object.
 */
function push_hub() {
  return PuSHHub::instance(PuSHHubSubscriptions::instance());
}

/**
 * A PuSHHub Subscriptions.
 */
class PuSHHubSubscriptions implements PuSHHubSubscriptionsInterface {
  /**
   * Singleton.
   */
  public function instance() {
    static $subscriptions;
    if (empty($subscriptions)) {
      $subscriptions = new PuSHHubSubscriptions();
    }
    return $subscriptions;
  }

  /**
   * Protect constructor.
   */
  protected function __construct() { }

  /**
   * Save a subscription.
   */
  public function save($topic, $subscriber, $secret) {
    $this->delete($topic, $subscriber);
    $subscription = array(
      'topic' => $topic,
      'subscriber' => $subscriber,
      'secret' => $secret,
      'timestamp' => REQUEST_TIME,
    );
    drupal_write_record('push_hub_subscriptions', $subscription);
  }

  /**
   * Delete a subscription.
   */
  public function delete($topic, $subscriber) {
    // TODO Please review the conversion of this statement to the D7 database API syntax.
    /* db_query("DELETE FROM {push_hub_subscriptions} WHERE topic = '%s' AND subscriber = '%s'", $topic, $subscriber) */
    db_delete('push_hub_subscriptions')
  ->condition('topic', $topic)
  ->condition('subscriber', $subscriber)
  ->execute();
  }

  /**
   * Find all subscriber URLs for a given topic URL.
   *
   * @return
   *   An array of subscriber URLs.
   */
  public function all($topic) {
    $subscribers = array();
    $result = db_query("SELECT subscriber FROM {push_hub_subscriptions} WHERE topic = :topic", array(':topic' => $topic));
    while ($row = $result->fetchObject()) {
      $subscribers[] = $row->subscriber;
    }
    return $subscribers;
  }

  /**
   * Retrieve a shared secret.
   */
  public function secret($topic, $subscriber) {
    return db_query("SELECT secret FROM {push_hub_subscriptions} WHERE topic = :topic AND subscriber = :subscriber", array(':topic' => $topic, ':subscriber' => $subscriber))->fetchField();
  }

  /**
   * Expire subscriptions.
   */
  public function expire($time) {
    // TODO Please review the conversion of this statement to the D7 database API syntax.
    /* db_query("DELETE FROM {push_hub_subscriptions} WHERE timestamp < %d", REQUEST_TIME - $time) */
    db_delete('push_hub_subscriptions')
  ->condition('timestamp', REQUEST_TIME - $time, '<')
  ->execute();
  }
}
