PHP数据湖与数据联邦查询数据湖集中存储各种格式的数据数据联邦允许跨多个数据源的查询。今天说说PHP中数据湖和数据联邦的实现。数据湖的核心是存储原始数据不要求预先定义Schema。PHP可以将各种数据导入湖中提供统一的查询接口。phpclass DataLake{private string $storageDir;private array $catalogs [];public function __construct(string $storageDir /var/data_lake){$this-storageDir rtrim($storageDir, /);if (!is_dir($this-storageDir)) {mkdir($this-storageDir, 0755, true);}}public function ingest(string $source, string $format, mixed $data): string{$path date(Y/m/d);$fullDir {$this-storageDir}/{$source}/{$path};if (!is_dir($fullDir)) {mkdir($fullDir, 0755, true);}$filename uniqid() . .{$format};$filepath {$fullDir}/{$filename};$content match ($format) {json json_encode($data, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),csv $this-toCsv($data),txt is_string($data) ? $data : json_encode($data),default serialize($data),};file_put_contents($filepath, $content);// 更新目录$this-catalogs[$source][] [path $filepath,format $format,size filesize($filepath),ingested_at date(Y-m-d H:i:s),];return $filepath;}public function query(string $source, array $filters []): array{$results [];if (!isset($this-catalogs[$source])) {$this-scanSource($source);}foreach ($this-catalogs[$source] ?? [] as $entry) {$data $this-readFile($entry);if ($this-matchFilters($data, $filters)) {$results[] $data;}}return $results;}public function queryAll(array $filters []): array{$results [];$sources glob({$this-storageDir}/*, GLOB_ONLYDIR);foreach ($sources as $sourceDir) {$source basename($sourceDir);$sourceResults $this-query($source, $filters);$results array_merge($results, array_map(fn($r) array_merge([_source $source], (array)$r),$sourceResults));}return $results;}private function scanSource(string $source): void{$sourceDir {$this-storageDir}/{$source};if (!is_dir($sourceDir)) return;$files new RecursiveIteratorIterator(new RecursiveDirectoryIterator($sourceDir));foreach ($files as $file) {if ($file-isFile()) {$format $file-getExtension();$this-catalogs[$source][] [path $file-getPathname(),format $format,size $file-getSize(),ingested_at date(Y-m-d H:i:s, $file-getMTime()),];}}}private function readFile(array $entry): mixed{$content file_get_contents($entry[path]);return match ($entry[format]) {json json_decode($content, true),csv $this-parseCsv($content),default $content,};}private function matchFilters(mixed $data, array $filters): bool{if (empty($filters)) return true;if (!is_array($data)) return false;foreach ($filters as $key $value) {if (!isset($data[$key])) return false;if (is_array($value) $value[operator] ?? ) {if ($data[$key] $value[value]) return false;} elseif ($data[$key] ! $value) {return false;}}return true;}private function toCsv(array $data): string{if (empty($data)) return ;$output fopen(php://temp, r);fputcsv($output, array_keys((array)$data[0]));foreach ($data as $row) {fputcsv($output, (array)$row);}rewind($output);return stream_get_contents($output);}private function parseCsv(string $content): array{$lines explode(\n, trim($content));if (empty($lines)) return [];$headers str_getcsv(array_shift($lines));$data [];foreach ($lines as $line) {$row str_getcsv($line);if (count($row) count($headers)) {$data[] array_combine($headers, $row);}}return $data;}public function getStats(): array{$totalFiles 0;$totalSize 0;$sources [];foreach ($this-catalogs as $source $entries) {$sourceFiles count($entries);$sourceSize array_sum(array_column($entries, size));$sources[$source] [files $sourceFiles, size $sourceSize];$totalFiles $sourceFiles;$totalSize $sourceSize;}return [sources $sources,total_files $totalFiles,total_size round($totalSize / 1024 / 1024, 2) . MB,];}}$lake new DataLake();$lake-ingest(orders, json, [[order_id 1, amount 100, status paid],[order_id 2, amount 200, status unpaid],]);$lake-ingest(users, csv, [[user_id 1, name 张三],[user_id 2, name 李四],]);print_r($lake-getStats());?数据联邦查询可以从多个数据源获取数据并合并结果phpinterface DataSource{public function query(string $sql): array;public function getSchema(): array;}class MySqlSource implements DataSource{private PDO $pdo;public function __construct(PDO $pdo){$this-pdo $pdo;}public function query(string $sql): array{return $this-pdo-query($sql)-fetchAll(PDO::FETCH_ASSOC);}public function getSchema(): array{$tables $this-pdo-query(SHOW TABLES)-fetchAll(PDO::FETCH_COLUMN);return [type mysql, tables $tables];}}class CsvSource implements DataSource{private string $dir;public function __construct(string $dir){$this-dir $dir;}public function query(string $sql): array{// 简化实现$parts explode( , $sql);$table trim($parts[3] ?? , );$file {$this-dir}/{$table}.csv;if (!file_exists($file)) return [];$data array_map(str_getcsv, file($file));$headers array_shift($data);$results [];foreach ($data as $row) {if (count($row) count($headers)) {$results[] array_combine($headers, $row);}}return $results;}public function getSchema(): array{$files glob({$this-dir}/*.csv);return [type csv, tables array_map(fn($f) basename($f, .csv), $files)];}}class FederationEngine{private array $sources [];public function addSource(string $name, DataSource $source): void{$this-sources[$name] $source;}public function getSources(): array{return array_keys($this-sources);}public function federatedQuery(string $query, array $sourceFilter []): array{$results [];foreach ($this-sources as $name $source) {if (!empty($sourceFilter) !in_array($name, $sourceFilter)) continue;try {$data $source-query($query);$results[$name] [success true,count count($data),data $data,];} catch (\Exception $e) {$results[$name] [success false,error $e-getMessage(),];}}return $results;}}$engine new FederationEngine();$engine-addSource(mysql, new MySqlSource(new PDO(mysql:hostlocalhost;dbnametest, root, )));$engine-addSource(csv, new CsvSource(/tmp/csv_data));echo 数据源: . implode(, , $engine-getSources()) . \n;$results $engine-federatedQuery(SELECT * FROM users);print_r($results);?数据湖和数据联邦是数据管理的进阶方案。数据湖存储原始数据不丢失信息数据联邦提供统一的查询入口。PHP实现的方案适合小规模场景生产环境建议使用专业的湖仓一体平台。