vendor/handcraftedinthealps/elasticsearch-bundle/Service/Manager.php line 305

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the ONGR package.
  4.  *
  5.  * (c) NFQ Technologies UAB <info@nfq.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace ONGR\ElasticsearchBundle\Service;
  11. use Elasticsearch\Client;
  12. use Elasticsearch\Common\Exceptions\Missing404Exception;
  13. use ONGR\ElasticsearchBundle\Event\Events;
  14. use ONGR\ElasticsearchBundle\Event\BulkEvent;
  15. use ONGR\ElasticsearchBundle\Event\CommitEvent;
  16. use ONGR\ElasticsearchBundle\Event\PrePersistEvent;
  17. use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
  18. use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
  19. use ONGR\ElasticsearchBundle\Result\Converter;
  20. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  21. use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
  22. use Symfony\Component\Stopwatch\Stopwatch;
  23. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface;
  24. /**
  25.  * Manager class.
  26.  */
  27. class Manager
  28. {
  29.     /**
  30.      * @var string Manager name
  31.      */
  32.     private $name;
  33.     /**
  34.      * @var array Manager configuration
  35.      */
  36.     private $config = [];
  37.     /**
  38.      * @var Client
  39.      */
  40.     private $client;
  41.     /**
  42.      * @var Converter
  43.      */
  44.     private $converter;
  45.     /**
  46.      * @var array Container for bulk queries
  47.      */
  48.     private $bulkQueries = [];
  49.     /**
  50.      * @var array Holder for consistency, refresh and replication parameters
  51.      */
  52.     private $bulkParams = [];
  53.     /**
  54.      * @var array
  55.      */
  56.     private $indexSettings;
  57.     /**
  58.      * @var MetadataCollector
  59.      */
  60.     private $metadataCollector;
  61.     /**
  62.      * After commit to make data available the refresh or flush operation is needed
  63.      * so one of those methods has to be defined, the default is refresh.
  64.      *
  65.      * @var string
  66.      */
  67.     private $commitMode 'refresh';
  68.     /**
  69.      * The size that defines after how much document inserts call commit function.
  70.      *
  71.      * @var int
  72.      */
  73.     private $bulkCommitSize 100;
  74.     /**
  75.      * Container to count how many documents was passed to the bulk query.
  76.      *
  77.      * @var int
  78.      */
  79.     private $bulkCount 0;
  80.     /**
  81.      * @var Repository[] Repository local cache
  82.      */
  83.     private $repositories;
  84.     /**
  85.      * @var EventDispatcherInterface
  86.      */
  87.     private $eventDispatcher;
  88.     /**
  89.      * @var Stopwatch
  90.      */
  91.     private $stopwatch;
  92.     /**
  93.      * @param string            $name              Manager name
  94.      * @param array             $config            Manager configuration
  95.      * @param Client            $client
  96.      * @param array             $indexSettings
  97.      * @param MetadataCollector $metadataCollector
  98.      * @param Converter         $converter
  99.      */
  100.     public function __construct(
  101.         $name,
  102.         array $config,
  103.         $client,
  104.         array $indexSettings,
  105.         $metadataCollector,
  106.         $converter
  107.     ) {
  108.         $this->name $name;
  109.         $this->config $config;
  110.         $this->client $client;
  111.         $this->indexSettings $indexSettings;
  112.         $this->metadataCollector $metadataCollector;
  113.         $this->converter $converter;
  114.     }
  115.     /**
  116.      * Returns Elasticsearch connection.
  117.      *
  118.      * @return Client
  119.      */
  120.     public function getClient()
  121.     {
  122.         return $this->client;
  123.     }
  124.     /**
  125.      * @return string
  126.      */
  127.     public function getName()
  128.     {
  129.         return $this->name;
  130.     }
  131.     /**
  132.      * @return array
  133.      */
  134.     public function getConfig()
  135.     {
  136.         return $this->config;
  137.     }
  138.     /**
  139.      * @param EventDispatcherInterface $eventDispatcher
  140.      */
  141.     public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
  142.     {
  143.         $this->eventDispatcher $eventDispatcher;
  144.     }
  145.     /**
  146.      * @param Stopwatch $stopwatch
  147.      */
  148.     public function setStopwatch(Stopwatch $stopwatch)
  149.     {
  150.         $this->stopwatch $stopwatch;
  151.     }
  152.     /**
  153.      * Returns repository by document class.
  154.      *
  155.      * @param string $className FQCN or string in Bundle:Document format
  156.      *
  157.      * @return Repository
  158.      */
  159.     public function getRepository($className)
  160.     {
  161.         if (!is_string($className)) {
  162.             throw new \InvalidArgumentException('Document class must be a string.');
  163.         }
  164.         $directory null;
  165.         if (strpos($className':')) {
  166.             $bundle explode(':'$className)[0];
  167.             if (isset($this->config['mappings'][$bundle]['document_dir'])) {
  168.                 $directory $this->config['mappings'][$bundle]['document_dir'];
  169.             }
  170.         }
  171.         $namespace $this->getMetadataCollector()->getClassName($className$directory);
  172.         if (isset($this->repositories[$namespace])) {
  173.             return $this->repositories[$namespace];
  174.         }
  175.         $repository $this->createRepository($namespace);
  176.         $this->repositories[$namespace] = $repository;
  177.         return $repository;
  178.     }
  179.     /**
  180.      * @return MetadataCollector
  181.      */
  182.     public function getMetadataCollector()
  183.     {
  184.         return $this->metadataCollector;
  185.     }
  186.     /**
  187.      * @return Converter
  188.      */
  189.     public function getConverter()
  190.     {
  191.         return $this->converter;
  192.     }
  193.     /**
  194.      * @return string
  195.      */
  196.     public function getCommitMode()
  197.     {
  198.         return $this->commitMode;
  199.     }
  200.     /**
  201.      * @param string $commitMode
  202.      */
  203.     public function setCommitMode($commitMode)
  204.     {
  205.         if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
  206.             $this->commitMode $commitMode;
  207.         } else {
  208.             throw new \LogicException('The commit method must be either refresh, flush or none.');
  209.         }
  210.     }
  211.     /**
  212.      * @return int
  213.      */
  214.     public function getBulkCommitSize()
  215.     {
  216.         return $this->bulkCommitSize;
  217.     }
  218.     /**
  219.      * @param int $bulkCommitSize
  220.      */
  221.     public function setBulkCommitSize($bulkCommitSize)
  222.     {
  223.         $this->bulkCommitSize $bulkCommitSize;
  224.     }
  225.     /**
  226.      * Creates a repository.
  227.      *
  228.      * @param string $className
  229.      *
  230.      * @return Repository
  231.      */
  232.     private function createRepository($className)
  233.     {
  234.         return new Repository($this$className);
  235.     }
  236.     /**
  237.      * Executes search query in the index.
  238.      *
  239.      * @param array $types             List of types to search in.
  240.      * @param array $query             Query to execute.
  241.      * @param array $queryStringParams Query parameters.
  242.      *
  243.      * @return array
  244.      */
  245.     public function search(array $types, array $query, array $queryStringParams = [])
  246.     {
  247.         $params = [];
  248.         $params['index'] = $this->getIndexName();
  249.         $resolvedTypes = [];
  250.         foreach ($types as $type) {
  251.             $resolvedTypes[] = $this->resolveTypeName($type);
  252.         }
  253.         if (!empty($resolvedTypes)) {
  254.             $params['type'] = implode(','$resolvedTypes);
  255.         }
  256.         $params['body'] = $query;
  257.         if (!empty($queryStringParams)) {
  258.             $params array_merge($queryStringParams$params);
  259.         }
  260.         $this->stopwatch('start''search');
  261.         $result $this->client->search($params);
  262.         $this->stopwatch('stop''search');
  263.         return $result;
  264.     }
  265.     /**
  266.      * Execute search queries using multisearch api
  267.      * $body - is array of requests described in elastic Multi Search API
  268.      *
  269.      * @param $body
  270.      * @return array
  271.      */
  272.     public function msearch(array $body)
  273.     {
  274.         $result $this->client->msearch(
  275.             [
  276.                 'index' => $this->getIndexName(), // set default index
  277.                 'body' => $body
  278.             ]
  279.         );
  280.         return $result;
  281.     }
  282.     /**
  283.      * Adds document to next flush.
  284.      *
  285.      * @param object $document
  286.      */
  287.     public function persist($document)
  288.     {
  289.         $this->dispatch(
  290.             Events::PRE_PERSIST,
  291.             new PrePersistEvent($document)
  292.         );
  293.         $documentArray $this->converter->convertToArray($document);
  294.         $type $this->getMetadataCollector()->getDocumentType(get_class($document));
  295.         $this->bulk('index'$type$documentArray);
  296.     }
  297.     /**
  298.      * Adds document for removal.
  299.      *
  300.      * @param object $document
  301.      */
  302.     public function remove($document)
  303.     {
  304.         $data $this->converter->convertToArray($document, [], ['_id''_routing']);
  305.         if (!isset($data['_id'])) {
  306.             throw new \LogicException(
  307.                 'In order to use remove() method document class must have property with @Id annotation.'
  308.             );
  309.         }
  310.         $type $this->getMetadataCollector()->getDocumentType(get_class($document));
  311.         $this->bulk('delete'$type$data);
  312.     }
  313.     /**
  314.      * Flushes elasticsearch index.
  315.      *
  316.      * @param array $params
  317.      *
  318.      * @return array
  319.      */
  320.     public function flush(array $params = [])
  321.     {
  322.         return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
  323.     }
  324.     /**
  325.      * Refreshes elasticsearch index.
  326.      *
  327.      * @param array $params
  328.      *
  329.      * @return array
  330.      */
  331.     public function refresh(array $params = [])
  332.     {
  333.         return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
  334.     }
  335.     /**
  336.      * Inserts the current query container to the index, used for bulk queries execution.
  337.      *
  338.      * @param array $params Parameters that will be passed to the flush or refresh queries.
  339.      *
  340.      * @return null|array
  341.      *
  342.      * @throws BulkWithErrorsException
  343.      */
  344.     public function commit(array $params = [])
  345.     {
  346.         if (!empty($this->bulkQueries)) {
  347.             $bulkQueries array_merge($this->bulkQueries$this->bulkParams);
  348.             $bulkQueries['index']['_index'] = $this->getIndexName();
  349.             $this->dispatch(
  350.                 Events::PRE_COMMIT,
  351.                 new CommitEvent($this->getCommitMode(), $bulkQueries)
  352.             );
  353.             $this->stopwatch('start''bulk');
  354.             $bulkResponse $this->client->bulk($bulkQueries);
  355.             $this->stopwatch('stop''bulk');
  356.             if (isset($bulkResponse['errors']) && $bulkResponse['errors']) {
  357.                 throw new BulkWithErrorsException(
  358.                     json_encode($bulkResponse),
  359.                     0,
  360.                     null,
  361.                     $bulkResponse
  362.                 );
  363.             }
  364.             $this->bulkQueries = [];
  365.             $this->bulkCount 0;
  366.             $this->stopwatch('start''refresh');
  367.             switch ($this->getCommitMode()) {
  368.                 case 'flush':
  369.                     $this->flush($params);
  370.                     break;
  371.                 case 'refresh':
  372.                     $this->refresh($params);
  373.                     break;
  374.             }
  375.             $this->dispatch(
  376.                 Events::POST_COMMIT,
  377.                 new CommitEvent($this->getCommitMode(), $bulkResponse)
  378.             );
  379.             $this->stopwatch('stop''refresh');
  380.             return $bulkResponse;
  381.         }
  382.         return null;
  383.     }
  384.     /**
  385.      * Adds query to bulk queries container.
  386.      *
  387.      * @param string       $operation One of: index, update, delete, create.
  388.      * @param string|array $type      Elasticsearch type name.
  389.      * @param array        $query     DSL to execute.
  390.      *
  391.      * @throws \InvalidArgumentException
  392.      *
  393.      * @return null|array
  394.      */
  395.     public function bulk($operation$type, array $query)
  396.     {
  397.         if (!in_array($operation, ['index''create''update''delete'])) {
  398.             throw new \InvalidArgumentException('Wrong bulk operation selected');
  399.         }
  400.         $this->dispatch(
  401.             Events::BULK,
  402.             new BulkEvent($operation$type$query)
  403.         );
  404.         $this->bulkQueries['body'][] = [
  405.             $operation => array_filter(
  406.                 [
  407.                     '_type' => $type,
  408.                     '_id' => isset($query['_id']) ? $query['_id'] : null,
  409.                     '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
  410.                     '_routing' => isset($query['_routing']) ? $query['_routing'] : null,
  411.                     '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
  412.                 ]
  413.             ),
  414.         ];
  415.         unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
  416.         switch ($operation) {
  417.             case 'index':
  418.             case 'create':
  419.             case 'update':
  420.                 $this->bulkQueries['body'][] = $query;
  421.                 break;
  422.             case 'delete':
  423.                 // Body for delete operation is not needed to apply.
  424.             default:
  425.                 // Do nothing.
  426.                 break;
  427.         }
  428.         // We are using counter because there is to difficult to resolve this from bulkQueries array.
  429.         $this->bulkCount++;
  430.         $response null;
  431.         if ($this->bulkCommitSize === $this->bulkCount) {
  432.             $response $this->commit();
  433.         }
  434.         return $response;
  435.     }
  436.     /**
  437.      * Optional setter to change bulk query params.
  438.      *
  439.      * @param array $params Possible keys:
  440.      *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
  441.      *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
  442.      *                      ['replication'] = (enum) Explicitly set the replication type.
  443.      */
  444.     public function setBulkParams(array $params)
  445.     {
  446.         $this->bulkParams $params;
  447.     }
  448.     /**
  449.      * Creates fresh elasticsearch index.
  450.      *
  451.      * @param bool $noMapping Determines if mapping should be included.
  452.      *
  453.      * @return array
  454.      */
  455.     public function createIndex($noMapping false)
  456.     {
  457.         if ($noMapping) {
  458.             unset($this->indexSettings['body']['mappings']);
  459.         }
  460.         return $this->getClient()->indices()->create($this->indexSettings);
  461.     }
  462.     /**
  463.      * Drops elasticsearch index.
  464.      */
  465.     public function dropIndex()
  466.     {
  467.         return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
  468.     }
  469.     /**
  470.      * Tries to drop and create fresh elasticsearch index.
  471.      *
  472.      * @param bool $noMapping Determines if mapping should be included.
  473.      *
  474.      * @return array
  475.      */
  476.     public function dropAndCreateIndex($noMapping false)
  477.     {
  478.         try {
  479.             if ($this->indexExists()) {
  480.                 $this->dropIndex();
  481.             }
  482.         } catch (\Exception $e) {
  483.             // Do nothing, our target is to create new index.
  484.         }
  485.         return $this->createIndex($noMapping);
  486.     }
  487.     /**
  488.      * Checks if connection index is already created.
  489.      *
  490.      * @return bool
  491.      */
  492.     public function indexExists()
  493.     {
  494.         return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
  495.     }
  496.     /**
  497.      * Returns index name this connection is attached to.
  498.      *
  499.      * @return string
  500.      */
  501.     public function getIndexName()
  502.     {
  503.         return $this->indexSettings['index'];
  504.     }
  505.     /**
  506.      * Sets index name for this connection.
  507.      *
  508.      * @param string $name
  509.      */
  510.     public function setIndexName($name)
  511.     {
  512.         $this->indexSettings['index'] = $name;
  513.     }
  514.     /**
  515.      * Returns mappings of the index for this connection.
  516.      *
  517.      * @return array
  518.      */
  519.     public function getIndexMappings()
  520.     {
  521.         return $this->indexSettings['body']['mappings'];
  522.     }
  523.     /**
  524.      * Returns Elasticsearch version number.
  525.      *
  526.      * @return string
  527.      */
  528.     public function getVersionNumber()
  529.     {
  530.         return $this->client->info()['version']['number'];
  531.     }
  532.     /**
  533.      * Clears elasticsearch client cache.
  534.      */
  535.     public function clearCache()
  536.     {
  537.         $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
  538.     }
  539.     /**
  540.      * Returns a single document by ID. Returns NULL if document was not found.
  541.      *
  542.      * @param string $className Document class name or Elasticsearch type name
  543.      * @param string $id        Document ID to find
  544.      * @param string $routing   Custom routing for the document
  545.      *
  546.      * @return object
  547.      */
  548.     public function find($className$id$routing null)
  549.     {
  550.         $type $this->resolveTypeName($className);
  551.         $params = [
  552.             'index' => $this->getIndexName(),
  553.             'type' => $type,
  554.             'id' => $id,
  555.         ];
  556.         if ($routing) {
  557.             $params['routing'] = $routing;
  558.         }
  559.         try {
  560.             $result $this->getClient()->get($params);
  561.         } catch (Missing404Exception $e) {
  562.             return null;
  563.         }
  564.         return $this->getConverter()->convertToDocument($result$this);
  565.     }
  566.     /**
  567.      * Fetches next set of results.
  568.      *
  569.      * @param string $scrollId
  570.      * @param string $scrollDuration
  571.      *
  572.      * @return mixed
  573.      *
  574.      * @throws \Exception
  575.      */
  576.     public function scroll(
  577.         $scrollId,
  578.         $scrollDuration '5m'
  579.     ) {
  580.         $results $this->getClient()->scroll(['scroll_id' => $scrollId'scroll' => $scrollDuration]);
  581.         return $results;
  582.     }
  583.     /**
  584.      * Clears scroll.
  585.      *
  586.      * @param string $scrollId
  587.      */
  588.     public function clearScroll($scrollId)
  589.     {
  590.         $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
  591.     }
  592.     /**
  593.      * Calls "Get Settings API" in Elasticsearch and will return you the currently configured settings.
  594.      *
  595.      * return array
  596.      */
  597.     public function getSettings()
  598.     {
  599.         return $this->getClient()->indices()->getSettings(['index' => $this->getIndexName()]);
  600.     }
  601.     /**
  602.      * Gets Elasticsearch aliases information.
  603.      * @param $params
  604.      *
  605.      * @return array
  606.      */
  607.     public function getAliases($params = [])
  608.     {
  609.         return $this->getClient()->indices()->getAliases(array_merge(['index' => $this->getIndexName()], $params));
  610.     }
  611.     /**
  612.      * Resolves type name by class name.
  613.      *
  614.      * @param string $className
  615.      *
  616.      * @return string
  617.      */
  618.     private function resolveTypeName($className)
  619.     {
  620.         if (strpos($className':') !== false || strpos($className'\\') !== false) {
  621.             return $this->getMetadataCollector()->getDocumentType($className);
  622.         }
  623.         return $className;
  624.     }
  625.     /**
  626.      * Starts and stops an event in the stopwatch
  627.      *
  628.      * @param string $action   only 'start' and 'stop'
  629.      * @param string $name     name of the event
  630.      */
  631.     private function stopwatch($action$name)
  632.     {
  633.         if (isset($this->stopwatch)) {
  634.             $this->stopwatch->$action('ongr_es: '.$name'ongr_es');
  635.         }
  636.     }
  637.     private function dispatch($eventName$event)
  638.     {
  639.         if ($this->eventDispatcher instanceof ContractsEventDispatcherInterface
  640.             || class_exists(LegacyEventDispatcherProxy::class)) {
  641.             return $this->eventDispatcher->dispatch($event$eventName);
  642.         } else {
  643.             return $this->eventDispatcher->dispatch($eventName$event);
  644.         }
  645.     }
  646. }