C#實現(xiàn)外排序的示例代碼
一、N 路歸并排序
1.1、概序
我們知道算法中有一種叫做分治思想,一個大問題我們可以采取分而治之,各個突破,當子問題解決了,大問題也就 KO 了,還有一點我們知道內(nèi)排序的歸并排序是采用二路歸并的,因為分治后有 LogN 層,每層兩路歸并需要 N 的時候,最后復雜度為 NlogN,那么外排序我們可以將這個“二”擴大到 M,也就是將一個大文件分成 M 個小文件,每個小文件是有序的,然后對應在內(nèi)存中我們開 M 個優(yōu)先隊列,每個隊列從對應編號的文件中讀取 TopN 條記錄,然后我們從 M 路隊列中各取一個數(shù)字進入中轉(zhuǎn)站隊列,并將該數(shù)字打上隊列編號標記,當從中轉(zhuǎn)站出來的最小數(shù)字就是我們最后要排序的數(shù)字之一,因為該數(shù)字打上了隊列編號,所以方便我們通知對應的編號隊列繼續(xù)出數(shù)字進入中轉(zhuǎn)站隊列,可以看出中轉(zhuǎn)站一直保存了 M 個記錄,當中轉(zhuǎn)站中的所有數(shù)字都出隊完畢,則外排序結(jié)束。如果大家有點蒙的話,我再配合一張圖,相信大家就會一目了然,這考驗的是我們的架構(gòu)能力。

圖中這里有個 Batch 容器,這個容器我是基于性能考慮的,當 batch=n 時,我們定時刷新到文件中,保證內(nèi)存有足夠的空間。
1.2、構(gòu)建
<1> 生成數(shù)據(jù)
這個基本沒什么好說的,采用隨機數(shù)生成 n 條記錄?!?br /><2> 切分數(shù)據(jù)
根據(jù)實際情況我們來決定到底要分成多少個小文件,并且小文件的數(shù)據(jù)必須是有序的,小文件的個數(shù)也對應這內(nèi)存中有多少個優(yōu)先隊列?!?br /><3> 加入隊列
我們知道內(nèi)存隊列存放的只是小文件的 topN 條記錄,當內(nèi)存隊列為空時,我們需要再次從小文件中讀取下一批的 TopN 條數(shù)據(jù),然后放入中轉(zhuǎn)站繼續(xù)進行比較。
<4> 測試
最后我們來測試一下:
數(shù)據(jù)量:short.MaxValue。
內(nèi)存存放量:1200。
在這種場景下,我們決定每個文件放 1000 條,也就有 33 個小文件,也就有 33 個內(nèi)存隊列,每個隊列取 Top100 條,Batch=500 時刷新
硬盤,中轉(zhuǎn)站存放 332 個數(shù)字(因為入中轉(zhuǎn)站時打上了隊列標記),最后內(nèi)存活動最大總數(shù)為:sum=33100+500+66=896<1200。
時間復雜度為 N*logN。

總的代碼:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.IO;
using System.Threading.Tasks;
namespace ConsoleApplication2
{
public class Program
{
public static void Main()
{
//生成2^15數(shù)據(jù)
CreateData(short.MaxValue);
//每個文件存放1000條
var pageSize = 1000;
//達到batchCount就刷新記錄
var batchCount = 0;
//判斷需要開啟的隊列
var pageCount = Split(pageSize);
//內(nèi)存限制:1500條
List<PriorityQueue<int?>> list = new List<PriorityQueue<int?>>();
//定義一個隊列中轉(zhuǎn)器
PriorityQueue<int?> queueControl = new PriorityQueue<int?>();
//定義每個隊列完成狀態(tài)
bool[] complete = new bool[pageCount];
//隊列讀取文件時應該跳過的記錄數(shù)
int[] skip = new int[pageCount];
//是否所有都完成了
int allcomplete = 0;
//定義 10 個隊列
for (int i = 0; i < pageCount; i++)
{
list.Add(new PriorityQueue<int?>());
//i: 記錄當前的隊列編碼
//list: 隊列數(shù)據(jù)
//skip:跳過的條數(shù)
AddQueue(i, list, ref skip);
}
//初始化操作,從每個隊列中取出一條記錄,并且在入隊的過程中
//記錄該數(shù)據(jù)所屬的 “隊列編號”
for (int i = 0; i < list.Count; i++)
{
var temp = list[i].Dequeue();
//i:隊列編碼,level:要排序的數(shù)據(jù)
queueControl.Eequeue(i, temp.level);
}
//默認500條寫入一次文件
List<int> batch = new List<int>();
//記錄下次應該從哪一個隊列中提取數(shù)據(jù)
int nextIndex = 0;
while (queueControl.Count() > 0)
{
//從中轉(zhuǎn)器中提取數(shù)據(jù)
var single = queueControl.Dequeue();
//記錄下一個隊列總應該出隊的數(shù)據(jù)
nextIndex = single.t.Value;
var nextData = list[nextIndex].Dequeue();
//如果改對內(nèi)彈出為null,則說明該隊列已經(jīng),需要從nextIndex文件中讀取數(shù)據(jù)
if (nextData == null)
{
//如果該隊列沒有全部讀取完畢
if (!complete[nextIndex])
{
AddQueue(nextIndex, list, ref skip);
//如果從文件中讀取還是沒有,則說明改文件中已經(jīng)沒有數(shù)據(jù)了
if (list[nextIndex].Count() == 0)
{
complete[nextIndex] = true;
allcomplete++;
}
else
{
nextData = list[nextIndex].Dequeue();
}
}
}
//如果彈出的數(shù)不為空,則將該數(shù)入中轉(zhuǎn)站
if (nextData != null)
{
//將要出隊的數(shù)據(jù) 轉(zhuǎn)入 中轉(zhuǎn)站
queueControl.Eequeue(nextIndex, nextData.level);
}
batch.Add(single.level);
//如果batch=500,或者所有的文件都已經(jīng)讀取完畢,此時我們要批量刷入數(shù)據(jù)
if (batch.Count == batchCount || allcomplete == pageCount)
{
var sw = new StreamWriter(Environment.CurrentDirectory + "http://result.txt", true);
foreach (var item in batch)
{
sw.WriteLine(item);
}
sw.Close();
batch.Clear();
}
}
Console.WriteLine("恭喜,外排序完畢!");
Console.Read();
}
#region 將數(shù)據(jù)加入指定編號隊列
/// <summary>
/// 將數(shù)據(jù)加入指定編號隊列
/// </summary>
/// <param name="i">隊列編號</param>
/// <param name="skip">文件中跳過的條數(shù)</param>
/// <param name="list"></param>
/// <param name="top">需要每次讀取的條數(shù)</param>
public static void AddQueue(int i, List<PriorityQueue<int?>> list, ref int[] skip, int top = 100)
{
var result = File.ReadAllLines((Environment.CurrentDirectory + "http://" + (i + 1) + ".txt"))
.Skip(skip[i]).Take(top).Select(j => Convert.ToInt32(j));
//加入到集合中
foreach (var item in result)
list[i].Eequeue(null, item);
//將個數(shù)累計到skip中,表示下次要跳過的記錄數(shù)
skip[i] += result.Count();
}
#endregion
#region 隨機生成數(shù)據(jù)
/// <summary>
/// 隨機生成數(shù)據(jù)
///<param name="max">執(zhí)行生成的數(shù)據(jù)上線</param>
/// </summary>
public static void CreateData(int max)
{
var sw = new StreamWriter(Environment.CurrentDirectory + "http://demo.txt");
for (int i = 0; i < max; i++)
{
Thread.Sleep(2);
var rand = new Random((int)DateTime.Now.Ticks).Next(0, int.MaxValue >> 3);
sw.WriteLine(rand);
}
sw.Close();
}
#endregion
#region 將數(shù)據(jù)進行分份
/// <summary>
/// 將數(shù)據(jù)進行分份
/// <param name="size">每頁要顯示的條數(shù)</param>
/// </summary>
public static int Split(int size)
{
//文件總記錄數(shù)
int totalCount = 0;
//每一份文件存放 size 條 記錄
List<int> small = new List<int>();
var sr = new StreamReader((Environment.CurrentDirectory + "http://demo.txt"));
var pageSize = size;
int pageCount = 0;
int pageIndex = 0;
while (true)
{
var line = sr.ReadLine();
if (!string.IsNullOrEmpty(line))
{
totalCount++;
//加入小集合中
small.Add(Convert.ToInt32(line));
//說明已經(jīng)到達指定的 size 條數(shù)了
if (totalCount % pageSize == 0)
{
pageIndex = totalCount / pageSize;
small = small.OrderBy(i => i).Select(i => i).ToList();
File.WriteAllLines(Environment.CurrentDirectory + "http://" + pageIndex + ".txt", small.Select(i => i.ToString()));
small.Clear();
}
}
else
{
//說明已經(jīng)讀完了,將剩余的small記錄寫入到文件中
pageCount = (int)Math.Ceiling((double)totalCount / pageSize);
small = small.OrderBy(i => i).Select(i => i).ToList();
File.WriteAllLines(Environment.CurrentDirectory + "http://" + pageCount + ".txt", small.Select(i => i.ToString()));
break;
}
}
return pageCount;
}
#endregion
}
}
優(yōu)先隊列:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.IO;
namespace ConsoleApplication2
{
public class PriorityQueue<T>
{
/// <summary>
/// 定義一個數(shù)組來存放節(jié)點
/// </summary>
private List<HeapNode> nodeList = new List<HeapNode>();
#region 堆節(jié)點定義
/// <summary>
/// 堆節(jié)點定義
/// </summary>
public class HeapNode
{
/// <summary>
/// 實體數(shù)據(jù)
/// </summary>
public T t { get; set; }
/// <summary>
/// 優(yōu)先級別 1-10個級別 (優(yōu)先級別遞增)
/// </summary>
public int level { get; set; }
public HeapNode(T t, int level)
{
this.t = t;
this.level = level;
}
public HeapNode() { }
}
#endregion
#region 添加操作
/// <summary>
/// 添加操作
/// </summary>
public void Eequeue(T t, int level = 1)
{
//將當前節(jié)點追加到堆尾
nodeList.Add(new HeapNode(t, level));
//如果只有一個節(jié)點,則不需要進行篩操作
if (nodeList.Count == 1)
return;
//獲取最后一個非葉子節(jié)點
int parent = nodeList.Count / 2 - 1;
//堆調(diào)整
UpHeapAdjust(nodeList, parent);
}
#endregion
#region 對堆進行上濾操作,使得滿足堆性質(zhì)
/// <summary>
/// 對堆進行上濾操作,使得滿足堆性質(zhì)
/// </summary>
/// <param name="nodeList"></param>
/// <param name="index">非葉子節(jié)點的之后指針(這里要注意:我們
/// 的篩操作時針對非葉節(jié)點的)
/// </param>
public void UpHeapAdjust(List<HeapNode> nodeList, int parent)
{
while (parent >= 0)
{
//當前index節(jié)點的左孩子
var left = 2 * parent + 1;
//當前index節(jié)點的右孩子
var right = left + 1;
//parent子節(jié)點中最大的孩子節(jié)點,方便于parent進行比較
//默認為left節(jié)點
var min = left;
//判斷當前節(jié)點是否有右孩子
if (right < nodeList.Count)
{
//判斷parent要比較的最大子節(jié)點
min = nodeList[left].level < nodeList[right].level ? left : right;
}
//如果parent節(jié)點大于它的某個子節(jié)點的話,此時篩操作
if (nodeList[parent].level > nodeList[min].level)
{
//子節(jié)點和父節(jié)點進行交換操作
var temp = nodeList[parent];
nodeList[parent] = nodeList[min];
nodeList[min] = temp;
//繼續(xù)進行更上一層的過濾
parent = (int)Math.Ceiling(parent / 2d) - 1;
}
else
{
break;
}
}
}
#endregion
#region 優(yōu)先隊列的出隊操作
/// <summary>
/// 優(yōu)先隊列的出隊操作
/// </summary>
/// <returns></returns>
public HeapNode Dequeue()
{
if (nodeList.Count == 0)
return null;
//出隊列操作,彈出數(shù)據(jù)頭元素
var pop = nodeList[0];
//用尾元素填充頭元素
nodeList[0] = nodeList[nodeList.Count - 1];
//刪除尾節(jié)點
nodeList.RemoveAt(nodeList.Count - 1);
//然后從根節(jié)點下濾堆
DownHeapAdjust(nodeList, 0);
return pop;
}
#endregion
#region 對堆進行下濾操作,使得滿足堆性質(zhì)
/// <summary>
/// 對堆進行下濾操作,使得滿足堆性質(zhì)
/// </summary>
/// <param name="nodeList"></param>
/// <param name="index">非葉子節(jié)點的之后指針(這里要注意:我們
/// 的篩操作時針對非葉節(jié)點的)
/// </param>
public void DownHeapAdjust(List<HeapNode> nodeList, int parent)
{
while (2 * parent + 1 < nodeList.Count)
{
//當前index節(jié)點的左孩子
var left = 2 * parent + 1;
//當前index節(jié)點的右孩子
var right = left + 1;
//parent子節(jié)點中最大的孩子節(jié)點,方便于parent進行比較
//默認為left節(jié)點
var min = left;
//判斷當前節(jié)點是否有右孩子
if (right < nodeList.Count)
{
//判斷parent要比較的最大子節(jié)點
min = nodeList[left].level < nodeList[right].level ? left : right;
}
//如果parent節(jié)點小于它的某個子節(jié)點的話,此時篩操作
if (nodeList[parent].level > nodeList[min].level)
{
//子節(jié)點和父節(jié)點進行交換操作
var temp = nodeList[parent];
nodeList[parent] = nodeList[min];
nodeList[min] = temp;
//繼續(xù)進行更下一層的過濾
parent = min;
}
else
{
break;
}
}
}
#endregion
#region 獲取元素并下降到指定的level級別
/// <summary>
/// 獲取元素并下降到指定的level級別
/// </summary>
/// <returns></returns>
public HeapNode GetAndDownPriority(int level)
{
if (nodeList.Count == 0)
return null;
//獲取頭元素
var pop = nodeList[0];
//設置指定優(yōu)先級(如果為 MinValue 則為 -- 操作)
nodeList[0].level = level == int.MinValue ? --nodeList[0].level : level;
//下濾堆
DownHeapAdjust(nodeList, 0);
return nodeList[0];
}
#endregion
#region 獲取元素并下降優(yōu)先級
/// <summary>
/// 獲取元素并下降優(yōu)先級
/// </summary>
/// <returns></returns>
public HeapNode GetAndDownPriority()
{
//下降一個優(yōu)先級
return GetAndDownPriority(int.MinValue);
}
#endregion
#region 返回當前優(yōu)先隊列中的元素個數(shù)
/// <summary>
/// 返回當前優(yōu)先隊列中的元素個數(shù)
/// </summary>
/// <returns></returns>
public int Count()
{
return nodeList.Count;
}
#endregion
}
}
到此這篇關于C#實現(xiàn)外排序的示例代碼的文章就介紹到這了,更多相關C# 外排序內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

