<?php
/*
* This file is part of the ONGR package.
*
* (c) NFQ Technologies UAB <info@nfq.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace ONGR\ElasticsearchBundle\Service;
use Elasticsearch\Client;
use Elasticsearch\Common\Exceptions\Missing404Exception;
use ONGR\ElasticsearchBundle\Event\Events;
use ONGR\ElasticsearchBundle\Event\BulkEvent;
use ONGR\ElasticsearchBundle\Event\CommitEvent;
use ONGR\ElasticsearchBundle\Event\PrePersistEvent;
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
use ONGR\ElasticsearchBundle\Result\Converter;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
use Symfony\Component\Stopwatch\Stopwatch;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface;
/**
* Manager class.
*/
class Manager
{
/**
* @var string Manager name
*/
private $name;
/**
* @var array Manager configuration
*/
private $config = [];
/**
* @var Client
*/
private $client;
/**
* @var Converter
*/
private $converter;
/**
* @var array Container for bulk queries
*/
private $bulkQueries = [];
/**
* @var array Holder for consistency, refresh and replication parameters
*/
private $bulkParams = [];
/**
* @var array
*/
private $indexSettings;
/**
* @var MetadataCollector
*/
private $metadataCollector;
/**
* After commit to make data available the refresh or flush operation is needed
* so one of those methods has to be defined, the default is refresh.
*
* @var string
*/
private $commitMode = 'refresh';
/**
* The size that defines after how much document inserts call commit function.
*
* @var int
*/
private $bulkCommitSize = 100;
/**
* Container to count how many documents was passed to the bulk query.
*
* @var int
*/
private $bulkCount = 0;
/**
* @var Repository[] Repository local cache
*/
private $repositories;
/**
* @var EventDispatcherInterface
*/
private $eventDispatcher;
/**
* @var Stopwatch
*/
private $stopwatch;
/**
* @param string $name Manager name
* @param array $config Manager configuration
* @param Client $client
* @param array $indexSettings
* @param MetadataCollector $metadataCollector
* @param Converter $converter
*/
public function __construct(
$name,
array $config,
$client,
array $indexSettings,
$metadataCollector,
$converter
) {
$this->name = $name;
$this->config = $config;
$this->client = $client;
$this->indexSettings = $indexSettings;
$this->metadataCollector = $metadataCollector;
$this->converter = $converter;
}
/**
* Returns Elasticsearch connection.
*
* @return Client
*/
public function getClient()
{
return $this->client;
}
/**
* @return string
*/
public function getName()
{
return $this->name;
}
/**
* @return array
*/
public function getConfig()
{
return $this->config;
}
/**
* @param EventDispatcherInterface $eventDispatcher
*/
public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
{
$this->eventDispatcher = $eventDispatcher;
}
/**
* @param Stopwatch $stopwatch
*/
public function setStopwatch(Stopwatch $stopwatch)
{
$this->stopwatch = $stopwatch;
}
/**
* Returns repository by document class.
*
* @param string $className FQCN or string in Bundle:Document format
*
* @return Repository
*/
public function getRepository($className)
{
if (!is_string($className)) {
throw new \InvalidArgumentException('Document class must be a string.');
}
$directory = null;
if (strpos($className, ':')) {
$bundle = explode(':', $className)[0];
if (isset($this->config['mappings'][$bundle]['document_dir'])) {
$directory = $this->config['mappings'][$bundle]['document_dir'];
}
}
$namespace = $this->getMetadataCollector()->getClassName($className, $directory);
if (isset($this->repositories[$namespace])) {
return $this->repositories[$namespace];
}
$repository = $this->createRepository($namespace);
$this->repositories[$namespace] = $repository;
return $repository;
}
/**
* @return MetadataCollector
*/
public function getMetadataCollector()
{
return $this->metadataCollector;
}
/**
* @return Converter
*/
public function getConverter()
{
return $this->converter;
}
/**
* @return string
*/
public function getCommitMode()
{
return $this->commitMode;
}
/**
* @param string $commitMode
*/
public function setCommitMode($commitMode)
{
if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
$this->commitMode = $commitMode;
} else {
throw new \LogicException('The commit method must be either refresh, flush or none.');
}
}
/**
* @return int
*/
public function getBulkCommitSize()
{
return $this->bulkCommitSize;
}
/**
* @param int $bulkCommitSize
*/
public function setBulkCommitSize($bulkCommitSize)
{
$this->bulkCommitSize = $bulkCommitSize;
}
/**
* Creates a repository.
*
* @param string $className
*
* @return Repository
*/
private function createRepository($className)
{
return new Repository($this, $className);
}
/**
* Executes search query in the index.
*
* @param array $types List of types to search in.
* @param array $query Query to execute.
* @param array $queryStringParams Query parameters.
*
* @return array
*/
public function search(array $types, array $query, array $queryStringParams = [])
{
$params = [];
$params['index'] = $this->getIndexName();
$resolvedTypes = [];
foreach ($types as $type) {
$resolvedTypes[] = $this->resolveTypeName($type);
}
if (!empty($resolvedTypes)) {
$params['type'] = implode(',', $resolvedTypes);
}
$params['body'] = $query;
if (!empty($queryStringParams)) {
$params = array_merge($queryStringParams, $params);
}
$this->stopwatch('start', 'search');
$result = $this->client->search($params);
$this->stopwatch('stop', 'search');
return $result;
}
/**
* Execute search queries using multisearch api
* $body - is array of requests described in elastic Multi Search API
*
* @param $body
* @return array
*/
public function msearch(array $body)
{
$result = $this->client->msearch(
[
'index' => $this->getIndexName(), // set default index
'body' => $body
]
);
return $result;
}
/**
* Adds document to next flush.
*
* @param object $document
*/
public function persist($document)
{
$this->dispatch(
Events::PRE_PERSIST,
new PrePersistEvent($document)
);
$documentArray = $this->converter->convertToArray($document);
$type = $this->getMetadataCollector()->getDocumentType(get_class($document));
$this->bulk('index', $type, $documentArray);
}
/**
* Adds document for removal.
*
* @param object $document
*/
public function remove($document)
{
$data = $this->converter->convertToArray($document, [], ['_id', '_routing']);
if (!isset($data['_id'])) {
throw new \LogicException(
'In order to use remove() method document class must have property with @Id annotation.'
);
}
$type = $this->getMetadataCollector()->getDocumentType(get_class($document));
$this->bulk('delete', $type, $data);
}
/**
* Flushes elasticsearch index.
*
* @param array $params
*
* @return array
*/
public function flush(array $params = [])
{
return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
}
/**
* Refreshes elasticsearch index.
*
* @param array $params
*
* @return array
*/
public function refresh(array $params = [])
{
return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
}
/**
* Inserts the current query container to the index, used for bulk queries execution.
*
* @param array $params Parameters that will be passed to the flush or refresh queries.
*
* @return null|array
*
* @throws BulkWithErrorsException
*/
public function commit(array $params = [])
{
if (!empty($this->bulkQueries)) {
$bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
$bulkQueries['index']['_index'] = $this->getIndexName();
$this->dispatch(
Events::PRE_COMMIT,
new CommitEvent($this->getCommitMode(), $bulkQueries)
);
$this->stopwatch('start', 'bulk');
$bulkResponse = $this->client->bulk($bulkQueries);
$this->stopwatch('stop', 'bulk');
if (isset($bulkResponse['errors']) && $bulkResponse['errors']) {
throw new BulkWithErrorsException(
json_encode($bulkResponse),
0,
null,
$bulkResponse
);
}
$this->bulkQueries = [];
$this->bulkCount = 0;
$this->stopwatch('start', 'refresh');
switch ($this->getCommitMode()) {
case 'flush':
$this->flush($params);
break;
case 'refresh':
$this->refresh($params);
break;
}
$this->dispatch(
Events::POST_COMMIT,
new CommitEvent($this->getCommitMode(), $bulkResponse)
);
$this->stopwatch('stop', 'refresh');
return $bulkResponse;
}
return null;
}
/**
* Adds query to bulk queries container.
*
* @param string $operation One of: index, update, delete, create.
* @param string|array $type Elasticsearch type name.
* @param array $query DSL to execute.
*
* @throws \InvalidArgumentException
*
* @return null|array
*/
public function bulk($operation, $type, array $query)
{
if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
throw new \InvalidArgumentException('Wrong bulk operation selected');
}
$this->dispatch(
Events::BULK,
new BulkEvent($operation, $type, $query)
);
$this->bulkQueries['body'][] = [
$operation => array_filter(
[
'_type' => $type,
'_id' => isset($query['_id']) ? $query['_id'] : null,
'_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
'_routing' => isset($query['_routing']) ? $query['_routing'] : null,
'_parent' => isset($query['_parent']) ? $query['_parent'] : null,
]
),
];
unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
switch ($operation) {
case 'index':
case 'create':
case 'update':
$this->bulkQueries['body'][] = $query;
break;
case 'delete':
// Body for delete operation is not needed to apply.
default:
// Do nothing.
break;
}
// We are using counter because there is to difficult to resolve this from bulkQueries array.
$this->bulkCount++;
$response = null;
if ($this->bulkCommitSize === $this->bulkCount) {
$response = $this->commit();
}
return $response;
}
/**
* Optional setter to change bulk query params.
*
* @param array $params Possible keys:
* ['consistency'] = (enum) Explicit write consistency setting for the operation.
* ['refresh'] = (boolean) Refresh the index after performing the operation.
* ['replication'] = (enum) Explicitly set the replication type.
*/
public function setBulkParams(array $params)
{
$this->bulkParams = $params;
}
/**
* Creates fresh elasticsearch index.
*
* @param bool $noMapping Determines if mapping should be included.
*
* @return array
*/
public function createIndex($noMapping = false)
{
if ($noMapping) {
unset($this->indexSettings['body']['mappings']);
}
return $this->getClient()->indices()->create($this->indexSettings);
}
/**
* Drops elasticsearch index.
*/
public function dropIndex()
{
return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
}
/**
* Tries to drop and create fresh elasticsearch index.
*
* @param bool $noMapping Determines if mapping should be included.
*
* @return array
*/
public function dropAndCreateIndex($noMapping = false)
{
try {
if ($this->indexExists()) {
$this->dropIndex();
}
} catch (\Exception $e) {
// Do nothing, our target is to create new index.
}
return $this->createIndex($noMapping);
}
/**
* Checks if connection index is already created.
*
* @return bool
*/
public function indexExists()
{
return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
}
/**
* Returns index name this connection is attached to.
*
* @return string
*/
public function getIndexName()
{
return $this->indexSettings['index'];
}
/**
* Sets index name for this connection.
*
* @param string $name
*/
public function setIndexName($name)
{
$this->indexSettings['index'] = $name;
}
/**
* Returns mappings of the index for this connection.
*
* @return array
*/
public function getIndexMappings()
{
return $this->indexSettings['body']['mappings'];
}
/**
* Returns Elasticsearch version number.
*
* @return string
*/
public function getVersionNumber()
{
return $this->client->info()['version']['number'];
}
/**
* Clears elasticsearch client cache.
*/
public function clearCache()
{
$this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
}
/**
* Returns a single document by ID. Returns NULL if document was not found.
*
* @param string $className Document class name or Elasticsearch type name
* @param string $id Document ID to find
* @param string $routing Custom routing for the document
*
* @return object
*/
public function find($className, $id, $routing = null)
{
$type = $this->resolveTypeName($className);
$params = [
'index' => $this->getIndexName(),
'type' => $type,
'id' => $id,
];
if ($routing) {
$params['routing'] = $routing;
}
try {
$result = $this->getClient()->get($params);
} catch (Missing404Exception $e) {
return null;
}
return $this->getConverter()->convertToDocument($result, $this);
}
/**
* Fetches next set of results.
*
* @param string $scrollId
* @param string $scrollDuration
*
* @return mixed
*
* @throws \Exception
*/
public function scroll(
$scrollId,
$scrollDuration = '5m'
) {
$results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
return $results;
}
/**
* Clears scroll.
*
* @param string $scrollId
*/
public function clearScroll($scrollId)
{
$this->getClient()->clearScroll(['scroll_id' => $scrollId]);
}
/**
* Calls "Get Settings API" in Elasticsearch and will return you the currently configured settings.
*
* return array
*/
public function getSettings()
{
return $this->getClient()->indices()->getSettings(['index' => $this->getIndexName()]);
}
/**
* Gets Elasticsearch aliases information.
* @param $params
*
* @return array
*/
public function getAliases($params = [])
{
return $this->getClient()->indices()->getAliases(array_merge(['index' => $this->getIndexName()], $params));
}
/**
* Resolves type name by class name.
*
* @param string $className
*
* @return string
*/
private function resolveTypeName($className)
{
if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
return $this->getMetadataCollector()->getDocumentType($className);
}
return $className;
}
/**
* Starts and stops an event in the stopwatch
*
* @param string $action only 'start' and 'stop'
* @param string $name name of the event
*/
private function stopwatch($action, $name)
{
if (isset($this->stopwatch)) {
$this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
}
}
private function dispatch($eventName, $event)
{
if ($this->eventDispatcher instanceof ContractsEventDispatcherInterface
|| class_exists(LegacyEventDispatcherProxy::class)) {
return $this->eventDispatcher->dispatch($event, $eventName);
} else {
return $this->eventDispatcher->dispatch($eventName, $event);
}
}
}