SNAP Library 6.0, Developer Reference  2020-12-09 16:24:20
SNAP, a general purpose, high performance system for analysis and manipulation of large networks
conv.h
Go to the documentation of this file.
1 #ifndef CONV_H
2 #define CONV_H
3 
4 namespace TSnap {
5 
7 template<class PGraph>
8 PGraph ToGraph(PTable Table, const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
9 {
10  PGraph Graph = PGraph::TObj::New();
11 
12  const TAttrType NodeType = Table->GetColType(SrcCol);
13  Assert(NodeType == Table->GetColType(DstCol));
14  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
15  const TInt DstColIdx = Table->GetColIdx(DstCol);
16 
17  // make single pass over all rows in the table
18  if (NodeType == atInt) {
19  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
20  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
21  // add src and dst nodes to graph if they are not seen earlier
22  TInt SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
23  TInt DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
24  //Using AddNodeUnchecked ensures that no error is thrown when the same node is seen twice
25  Graph->AddNodeUnchecked(SVal);
26  Graph->AddNodeUnchecked(DVal);
27  Graph->AddEdgeUnchecked(SVal, DVal);
28  }
29  } else if (NodeType == atFlt) {
30  // node values - i.e. the unique values of src/dst col
31  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
32  THash<TFlt, TInt> FltNodeVals;
33  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
34  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
35  // add src and dst nodes to graph if they are not seen earlier
36  TInt SVal, DVal;
37  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
38  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
39  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
40  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
41  Graph->AddEdge(SVal, DVal);
42  }
43  } else {
44  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
45  if ((Table->Next)[CurrRowIdx] == Table->Invalid) { continue; }
46  // add src and dst nodes to graph if they are not seen earlier
47  TInt SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
48 // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
49  TInt DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
50 // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
51  //Using AddNodeUnchecked ensures that no error is thrown when the same node is seen twice
52  Graph->AddNodeUnchecked(SVal);
53  Graph->AddNodeUnchecked(DVal);
54  Graph->AddEdgeUnchecked(SVal, DVal);
55  }
56  }
57 
58  Graph->SortNodeAdjV();
59  return Graph;
60 }
61 
63 template<class PGraph>
64 PGraph ToNetwork(PTable Table,
65  const TStr& SrcCol, const TStr& DstCol,
66  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
67  TAttrAggr AggrPolicy)
68 {
69  PGraph Graph = PGraph::TObj::New();
70 
71  const TAttrType NodeType = Table->GetColType(SrcCol);
72  Assert(NodeType == Table->GetColType(DstCol));
73  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
74  const TInt DstColIdx = Table->GetColIdx(DstCol);
75 
76  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
77  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
78  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
79 
80  // node values - i.e. the unique values of src/dst col
81  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
82  THash<TFlt, TInt> FltNodeVals;
83 
84  // node attributes
85  THash<TInt, TStrIntVH> NodeIntAttrs;
86  THash<TInt, TStrFltVH> NodeFltAttrs;
87  THash<TInt, TStrStrVH> NodeStrAttrs;
88 
89  // make single pass over all rows in the table
90  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
91  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
92  continue;
93  }
94 
95  // add src and dst nodes to graph if they are not seen earlier
96  TInt SVal, DVal;
97  if (NodeType == atFlt) {
98  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
99  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
100  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
101  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
102  } else if (NodeType == atInt || NodeType == atStr) {
103  if (NodeType == atInt) {
104  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
105  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
106  } else {
107  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
108  if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
109  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
110  if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
111  }
112  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
113  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
114  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
115  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
116  }
117 
118  // add edge and edge attributes
119  Graph->AddEdge(SVal, DVal, CurrRowIdx);
120 
121  // Aggregate edge attributes and add to graph
122  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
123  TStr ColName = EdgeAttrV[i];
124  TAttrType T = Table->GetColType(ColName);
125  TInt Index = Table->GetColIdx(ColName);
126  switch (T) {
127  case atInt:
128  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
129  break;
130  case atFlt:
131  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
132  break;
133  case atStr:
134  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrValIdx(Index, CurrRowIdx), ColName);
135  break;
136  }
137  }
138 
139  // get src and dst node attributes into hashmaps
140  if ((Table->SrcNodeAttrV).Len() > 0) {
141  Table->AddNodeAttributes(SVal, Table->SrcNodeAttrV, CurrRowIdx, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
142  }
143 
144  if ((Table->DstNodeAttrV).Len() > 0) {
145  Table->AddNodeAttributes(DVal, Table->DstNodeAttrV, CurrRowIdx, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
146  }
147  }
148 
149  // aggregate node attributes and add to graph
150  if ((Table->SrcNodeAttrV).Len() > 0 || (Table->DstNodeAttrV).Len() > 0) {
151  for (TNEANet::TNodeI NodeI = Graph->BegNI(); NodeI < Graph->EndNI(); NodeI++) {
152  TInt NId = NodeI.GetId();
153  if (NodeIntAttrs.IsKey(NId)) {
154  TStrIntVH IntAttrVals = NodeIntAttrs.GetDat(NId);
155  for (TStrIntVH::TIter it = IntAttrVals.BegI(); it < IntAttrVals.EndI(); it++) {
156  TInt AttrVal = Table->AggregateVector<TInt>(it.GetDat(), AggrPolicy);
157  Graph->AddIntAttrDatN(NId, AttrVal, it.GetKey());
158  }
159  }
160  if (NodeFltAttrs.IsKey(NId)) {
161  TStrFltVH FltAttrVals = NodeFltAttrs.GetDat(NId);
162  for (TStrFltVH::TIter it = FltAttrVals.BegI(); it < FltAttrVals.EndI(); it++) {
163  TFlt AttrVal = Table->AggregateVector<TFlt>(it.GetDat(), AggrPolicy);
164  Graph->AddFltAttrDatN(NId, AttrVal, it.GetKey());
165  }
166  }
167  if (NodeStrAttrs.IsKey(NId)) {
168  TStrStrVH StrAttrVals = NodeStrAttrs.GetDat(NId);
169  for (TStrStrVH::TIter it = StrAttrVals.BegI(); it < StrAttrVals.EndI(); it++) {
170  TStr AttrVal = Table->AggregateVector<TStr>(it.GetDat(), AggrPolicy);
171  Graph->AddStrAttrDatN(NId, AttrVal, it.GetKey());
172  }
173  }
174  }
175  }
176 
177  return Graph;
178 }
179 
181 template<class PGraph>
182 PGraph ToNetwork(PTable Table,
183  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
184 {
185  TStrV V;
186  return ToNetwork<PGraph>(Table, SrcCol, DstCol, V, AggrPolicy);
187 }
188 
189 #ifdef GCC_ATOMIC
190 template<class PGraphMP>
192 PGraphMP ToGraphMP(PTable Table, const TStr& SrcCol, const TStr& DstCol) {
193  // double start = omp_get_wtime();
194  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
195  const TInt DstColIdx = Table->GetColIdx(DstCol);
196  const TAttrType NodeType = Table->GetColType(SrcCol);
197  Assert(NodeType == Table->GetColType(DstCol));
198 
199  const TInt NumRows = Table->NumValidRows;
200 
201  TIntV SrcCol1, DstCol1, SrcCol2, DstCol2;
202 
203  #pragma omp parallel sections num_threads(4)
204  {
205  #pragma omp section
206  { SrcCol1.Reserve(NumRows, NumRows); }
207  #pragma omp section
208  { SrcCol2.Reserve(NumRows, NumRows); }
209  #pragma omp section
210  { DstCol1.Reserve(NumRows, NumRows); }
211  #pragma omp section
212  { DstCol2.Reserve(NumRows, NumRows); }
213  }
214 
215  // double endResize = omp_get_wtime();
216  // printf("Resize time = %f\n", endResize-start);
217 
218  TIntPrV Partitions;
219  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
220  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
221 
222  // double endPartition = omp_get_wtime();
223  // printf("Partition time = %f\n", endPartition-endResize);
224 
225  omp_set_num_threads(omp_get_max_threads());
226  if (NodeType == atInt) {
227  #pragma omp parallel for schedule(static)
228  for (int i = 0; i < Partitions.Len(); i++) {
229  TRowIterator RowI(Partitions[i].GetVal1(), Table());
230  TRowIterator EndI(Partitions[i].GetVal2(), Table());
231  while (RowI < EndI) {
232  TInt RowId = RowI.GetRowIdx();
233  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
234  SrcCol2[RowId] = RowI.GetIntAttr(SrcColIdx);
235  DstCol1[RowId] = RowI.GetIntAttr(DstColIdx);
236  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
237  RowI++;
238  }
239  }
240  }
241  else if (NodeType == atStr) {
242  #pragma omp parallel for schedule(static)
243  for (int i = 0; i < Partitions.Len(); i++) {
244  TRowIterator RowI(Partitions[i].GetVal1(), Table());
245  TRowIterator EndI(Partitions[i].GetVal2(), Table());
246  while (RowI < EndI) {
247  TInt RowId = RowI.GetRowIdx();
248  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
249  SrcCol2[RowId] = RowI.GetStrMapById(SrcColIdx);
250  DstCol1[RowId] = RowI.GetStrMapById(DstColIdx);
251  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
252  RowI++;
253  }
254  }
255  }
256 
257  omp_set_num_threads(omp_get_max_threads());
258  #pragma omp parallel
259  {
260  #pragma omp single nowait
261  {
262  #pragma omp task untied shared(SrcCol1, DstCol1)
263  { TTable::QSortKeyVal(SrcCol1, DstCol1, 0, NumRows-1); }
264  }
265  #pragma omp single nowait
266  {
267  #pragma omp task untied shared(SrcCol2, DstCol2)
268  { TTable::QSortKeyVal(DstCol2, SrcCol2, 0, NumRows-1); }
269  }
270  #pragma omp taskwait
271  }
272 
273  // TTable::PSRSKeyVal(SrcCol1, DstCol1, 0, NumRows-1);
274  // TTable::PSRSKeyVal(DstCol2, SrcCol2, 0, NumRows-1);
275 
276  // TInt IsS = TTable::CheckSortedKeyVal(SrcCol1, DstCol1, 0, NumRows-1);
277  // TInt IsD = TTable::CheckSortedKeyVal(DstCol2, SrcCol2, 0, NumRows-1);
278  // printf("IsSorted = %d %d\n", IsS.Val, IsD.Val);
279 
280  // double endSort = omp_get_wtime();
281  // printf("Sort time = %f\n", endSort-endCopy);
282  //return TNGraphMP::New(10, 100);
283 
284  TInt NumThreads = omp_get_max_threads();
285  TInt PartSize = (NumRows/NumThreads);
286 
287  TIntV SrcOffsets, DstOffsets;
288  SrcOffsets.Add(0);
289  for (TInt i = 1; i < NumThreads; i++) {
290  TInt CurrOffset = i * PartSize;
291  while (CurrOffset < (i+1) * PartSize &&
292  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
293  CurrOffset++;
294  }
295  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
296  }
297  SrcOffsets.Add(NumRows);
298 
299  DstOffsets.Add(0);
300  for (TInt i = 1; i < NumThreads; i++) {
301  TInt CurrOffset = i * PartSize;
302  while (CurrOffset < (i+1) * PartSize &&
303  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
304  CurrOffset++;
305  }
306  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
307  }
308  DstOffsets.Add(NumRows);
309 
310  TInt SrcPartCnt = SrcOffsets.Len()-1;
311  TInt DstPartCnt = DstOffsets.Len()-1;
312 
313  // for (TInt i = 0; i < SrcOffsets.Len(); i++) {
314  // printf("%d ", SrcOffsets[i].Val);
315  // }
316  // printf("\n");
317  // for (TInt i = 0; i < DstOffsets.Len(); i++) {
318  // printf("%d ", DstOffsets[i].Val);
319  // }
320  // printf("\n");
321 
322  TIntV SrcNodeCounts, DstNodeCounts;
323  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
324  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
325 
326  #pragma omp parallel for schedule(dynamic)
327  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
328  if (t < SrcPartCnt) {
329  TInt i = t;
330  if (SrcOffsets[i] != SrcOffsets[i+1]) {
331  SrcNodeCounts[i] = 1;
332  TInt CurrNode = SrcCol1[SrcOffsets[i]];
333  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
334  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
335  if (j < SrcOffsets[i+1]) {
336  SrcNodeCounts[i]++;
337  CurrNode = SrcCol1[j];
338  }
339  }
340  }
341  } else {
342  TInt i = t - SrcPartCnt;
343  if (DstOffsets[i] != DstOffsets[i+1]) {
344  DstNodeCounts[i] = 1;
345  TInt CurrNode = DstCol2[DstOffsets[i]];
346  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
347  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
348  if (j < DstOffsets[i+1]) {
349  DstNodeCounts[i]++;
350  CurrNode = DstCol2[j];
351  }
352  }
353  }
354  }
355  }
356 
357  // for (TInt i = 0; i < SrcNodeCounts.Len(); i++) {
358  // printf("%d ", SrcNodeCounts[i].Val);
359  // }
360  // printf("\n");
361  // for (TInt i = 0; i < DstNodeCounts.Len(); i++) {
362  // printf("%d ", DstNodeCounts[i].Val);
363  // }
364  // printf("\n");
365 
366  TInt TotalSrcNodes = 0;
367  TIntV SrcIdOffsets;
368  for (int i = 0; i < SrcPartCnt; i++) {
369  SrcIdOffsets.Add(TotalSrcNodes);
370  TotalSrcNodes += SrcNodeCounts[i];
371  }
372 
373  TInt TotalDstNodes = 0;
374  TIntV DstIdOffsets;
375  for (int i = 0; i < DstPartCnt; i++) {
376  DstIdOffsets.Add(TotalDstNodes);
377  TotalDstNodes += DstNodeCounts[i];
378  }
379 
380  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
381 
382  TIntPrV SrcNodeIds, DstNodeIds;
383  #pragma omp parallel sections
384  {
385  #pragma omp section
386  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
387  #pragma omp section
388  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
389  }
390 
391  #pragma omp parallel for schedule(dynamic)
392  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
393  if (t < SrcPartCnt) {
394  TInt i = t;
395  if (SrcOffsets[i] != SrcOffsets[i+1]) {
396  TInt CurrNode = SrcCol1[SrcOffsets[i]];
397  TInt ThreadOffset = SrcIdOffsets[i];
398  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
399  TInt CurrCount = 1;
400  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
401  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
402  if (j < SrcOffsets[i+1]) {
403  CurrNode = SrcCol1[j];
404  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
405  CurrCount++;
406  }
407  }
408  }
409  } else {
410  TInt i = t - SrcPartCnt;
411  if (DstOffsets[i] != DstOffsets[i+1]) {
412  TInt CurrNode = DstCol2[DstOffsets[i]];
413  TInt ThreadOffset = DstIdOffsets[i];
414  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
415  TInt CurrCount = 1;
416  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
417  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
418  if (j < DstOffsets[i+1]) {
419  CurrNode = DstCol2[j];
420  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
421  CurrCount++;
422  }
423  }
424  }
425  }
426  }
427 
428  // double endNode = omp_get_wtime();
429  // printf("Node time = %f\n", endNode-endSort);
430 
431  TIntTrV Nodes;
432  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
433 
434  // double endNodeResize = omp_get_wtime();
435  // printf("(NodeResize time = %f)\n", endNodeResize-endNode);
436 
437  TInt i = 0, j = 0;
438  while (i < TotalSrcNodes && j < TotalDstNodes) {
439  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
440  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
441  i++;
442  j++;
443  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
444  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
445  i++;
446  } else {
447  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
448  j++;
449  }
450  }
451  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
452  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
453 
454  // double endMerge = omp_get_wtime();
455  // printf("Merge time = %f\n", endMerge-endNode);
456 
457  TInt NumNodes = Nodes.Len();
458  // printf("NumNodes = %d\n", NumNodes.Val);
459 
460  PGraphMP Graph = TNGraphMP::New(NumNodes, NumRows);
461  NumThreads = 1;
462  int Delta = (NumNodes+NumThreads-1)/NumThreads;
463 
464  TVec<TIntV> InVV(NumNodes);
465  TVec<TIntV> OutVV(NumNodes);
466 
467  omp_set_num_threads(NumThreads);
468  #pragma omp parallel for schedule(static,Delta)
469  for (int m = 0; m < NumNodes; m++) {
470  //double startTr = omp_get_wtime();
471  //TIntV OutV, InV;
472  TInt n, i, j;
473  Nodes[m].GetVal(n, i, j);
474  if (i >= 0) {
475  TInt Offset = SrcNodeIds[i].GetVal2();
476  TInt Sz = DstCol1.Len()-Offset;
477  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
478  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
479  OutVV[m].Reserve(Sz);
480  }
481  if (j >= 0) {
482  TInt Offset = DstNodeIds[j].GetVal2();
483  TInt Sz = SrcCol2.Len()-Offset;
484  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
485  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
486  InVV[m].Reserve(Sz);
487  }
488  //double endTr = omp_get_wtime();
489  //printf("Thread=%d, i=%d, t=%f\n", omp_get_thread_num(), m, endTr-startTr);
490  }
491 
492  // double endAlloc = omp_get_wtime();
493  // printf("Alloc time = %f\n", endAlloc-endMerge);
494 
495  NumThreads = omp_get_max_threads();
496  Delta = (NumNodes+NumThreads-1)/(10*NumThreads);
497  omp_set_num_threads(NumThreads);
498  #pragma omp parallel for schedule(dynamic)
499  for (int m = 0; m < NumNodes; m++) {
500  //double startTr = omp_get_wtime();
501  //TIntV OutV, InV;
502  TInt n, i, j;
503  Nodes[m].GetVal(n, i, j);
504  if (i >= 0) {
505  TInt Offset = SrcNodeIds[i].GetVal2();
506  TInt Sz = DstCol1.Len()-Offset;
507  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
508  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
509  OutVV[m].CopyUniqueFrom(DstCol1, Offset, Sz);
510  }
511  if (j >= 0) {
512  TInt Offset = DstNodeIds[j].GetVal2();
513  TInt Sz = SrcCol2.Len()-Offset;
514  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
515  //printf("OutV: %d %d %d\n", n.Val, Offset.Val, Sz.Val);
516  InVV[m].CopyUniqueFrom(SrcCol2, Offset, Sz);
517  }
518  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
519  //double endTr = omp_get_wtime();
520  //printf("Thread=%d, i=%d, t=%f\n", omp_get_thread_num(), m, endTr-startTr);
521  }
522  Graph->SetNodes(NumNodes);
523 
524  // double endAdd = omp_get_wtime();
525  // printf("Add time = %f\n", endAdd-endAlloc);
526 
527  return Graph;
528 }
529 
531 template<class PGraphMP>
532 PGraphMP ToGraphMP3(PTable Table, const TStr& SrcCol, const TStr& DstCol) {
533  PNGraphMP Graph;
534  int MaxThreads = omp_get_max_threads();
535  int Length, Threads, Delta, Nodes, Last;
536  uint64_t NumNodesEst;
537  TInt SrcColIdx, DstColIdx;
538  TIntV InVec, OutVec;
539 
540  SrcColIdx = Table->GetColIdx(SrcCol);
541  DstColIdx = Table->GetColIdx(DstCol);
542  const TAttrType NodeType = Table->GetColType(SrcCol);
543  Assert(NodeType == Table->GetColType(DstCol));
544 
545 
546  /* Estimate number of nodes in the graph */
547  int NumRows = Table->Next.Len();
548  double Load = 10;
549  int sz = NumRows / Load;
550  int *buckets = (int *)malloc(sz * sizeof(int));
551 
552  #pragma omp parallel for
553  for (int i = 0; i < sz; i++)
554  buckets[i] = 0;
555 
556  if (NodeType == atInt) {
557  #pragma omp parallel for
558  for (int i = 0; i < NumRows; i++) {
559  int vert = Table->IntCols[DstColIdx][i];
560  buckets[vert % sz] = 1;
561  }
562  }
563  else if (NodeType == atStr ) {
564  #pragma omp parallel for
565  for (int i = 0; i < NumRows; i++) {
566  int vert = (Table->StrColMaps)[DstColIdx][i];
567  buckets[vert % sz] = 1;
568  }
569  }
570  int cnt = 0;
571  #pragma omp parallel for reduction(+:cnt)
572  for (int i = 0; i < sz; i++) {
573  if (buckets[i] == 0)
574  cnt += 1;
575  }
576 
577  NumNodesEst = sz * log ((double)sz / cnt);
578  free (buckets);
579 
580  /* Until we correctly estimate the number of nodes */
581  while (1)
582  {
583  Graph = TNGraphMP::New(NumNodesEst, 100);
584 
585  Length = Graph->Reserved();
586  Threads = MaxThreads/2;
587  Delta = (Length + Threads - 1) / Threads;
588 
589  OutVec.Gen(Length);
590  InVec.Gen(Length);
591 
592  /* build the node hash table, count the size of edge lists */
593  Last = NumRows;
594  Nodes = 0;
595  omp_set_num_threads(Threads);
596  #pragma omp parallel for schedule(static, Delta)
597  for (int CurrRowIdx = 0; CurrRowIdx < Last; CurrRowIdx++) {
598  if ((uint64_t) Nodes + 1000 >= NumNodesEst) {
599  /* need bigger hash table */
600  continue;
601  }
602 
603  TInt SVal, DVal;
604  if (NodeType == atInt) {
605  SVal = Table->IntCols[SrcColIdx][CurrRowIdx];
606  DVal = Table->IntCols[DstColIdx][CurrRowIdx];
607  }
608  else if (NodeType == atStr ) {
609  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
610  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
611  }
612  int SrcIdx = abs((SVal.GetPrimHashCd()) % Length);
613  if (!Graph->AddOutEdge1(SrcIdx, SVal, DVal)) {
614  #pragma omp critical
615  {
616  Nodes++;
617  }
618  }
619  __sync_fetch_and_add(&OutVec[SrcIdx].Val, 1);
620 
621  int DstIdx = abs((DVal.GetPrimHashCd()) % Length);
622  if (!Graph->AddInEdge1(DstIdx, SVal, DVal)) {
623  #pragma omp critical
624  {
625  Nodes++;
626  }
627  }
628  __sync_fetch_and_add(&InVec[DstIdx].Val, 1);
629 
630  }
631  if ((uint64_t) Nodes + 1000 >= NumNodesEst) {
632  /* We need to double our num nodes estimate */
633  Graph.Clr();
634  InVec.Clr();
635  OutVec.Clr();
636  NumNodesEst *= 2;
637  }
638  else {
639  break;
640  }
641  }
642 
643  Graph->SetNodes(Nodes);
644 
645  uint Edges = 0;
646  for (int i = 0; i < Length; i++) {
647  Edges += OutVec[i] + InVec[i];
648  }
649 
650  for (int Idx = 0; Idx < Length; Idx++) {
651  if (OutVec[Idx] > 0 || InVec[Idx] > 0) {
652  Graph->ReserveNodeDegs(Idx, InVec[Idx], OutVec[Idx]);
653  }
654  }
655 
656  /* assign edges */
657  Length = Graph->Reserved();
658  Threads = MaxThreads;
659  Delta = (Length + Threads - 1) / Threads;
660 
661  omp_set_num_threads(Threads);
662  #pragma omp parallel for schedule(static,Delta)
663  for (int CurrRowIdx = 0; CurrRowIdx < Last; CurrRowIdx++) {
664  TInt SVal, DVal;
665  if (NodeType == atInt) {
666  SVal = Table->IntCols[SrcColIdx][CurrRowIdx];
667  DVal = Table->IntCols[DstColIdx][CurrRowIdx];
668  }
669  else if (NodeType == atStr) {
670  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
671  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
672  }
673 
674  Graph->AddOutEdge2(SVal, DVal);
675  Graph->AddInEdge2(SVal, DVal);
676  }
677 
678  /* sort edges */
679  Length = Graph->Reserved();
680  Threads = MaxThreads*2;
681  Delta = (Length + Threads - 1) / Threads;
682 
683  omp_set_num_threads(Threads);
684  #pragma omp parallel for schedule(dynamic)
685  for (int Idx = 0; Idx < Length; Idx++) {
686  if (OutVec[Idx] > 0 || InVec[Idx] > 0) {
687  Graph->SortEdges(Idx, InVec[Idx], OutVec[Idx]);
688  }
689  }
690 
691  return Graph;
692 }
693 
695 template<class PGraphMP>
696 inline PGraphMP ToNetworkMP(PTable Table,
697  const TStr& SrcCol, const TStr& DstCol,
698  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
699  TAttrAggr AggrPolicy) {
701 
703  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
704  const TInt DstColIdx = Table->GetColIdx(DstCol);
705  const TInt NumRows = Table->GetNumValidRows();
706 
707  const TAttrType NodeType = Table->GetColType(SrcCol);
708  Assert(NodeType == Table->GetColType(DstCol));
709 
710 
711  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
712 
713  THash<TInt, TStrIntVH> NodeIntAttrs;
714  THash<TInt, TStrFltVH> NodeFltAttrs;
715  THash<TInt, TStrStrVH> NodeStrAttrs;
716 
717  #pragma omp parallel sections num_threads(4)
718  {
719  #pragma omp section
720  { SrcCol1.Reserve(NumRows, NumRows); }
721  #pragma omp section
722  { EdgeCol1.Reserve(NumRows, NumRows); }
723  #pragma omp section
724  { DstCol2.Reserve(NumRows, NumRows); }
725  #pragma omp section
726  { EdgeCol2.Reserve(NumRows, NumRows); }
727  }
729 
731  TIntPrV Partitions;
732  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
733  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
734 
735  // double endPartition = omp_get_wtime();
736  // printf("Partition time = %f\n", endPartition-endResize);
737 
738  omp_set_num_threads(omp_get_max_threads());
739  if (NodeType == atInt) {
740  #pragma omp parallel for schedule(static)
741  for (int i = 0; i < Partitions.Len(); i++) {
742  TRowIterator RowI(Partitions[i].GetVal1(), Table());
743  TRowIterator EndI(Partitions[i].GetVal2(), Table());
744  while (RowI < EndI) {
745  TInt RowId = RowI.GetRowIdx();
746  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
747  EdgeCol1[RowId] = RowId;
748  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
749  EdgeCol2[RowId] = RowId;
750  RowI++;
751  }
752  }
753  }
754  else if (NodeType == atStr) {
755  #pragma omp parallel for schedule(static)
756  for (int i = 0; i < Partitions.Len(); i++) {
757  TRowIterator RowI(Partitions[i].GetVal1(), Table());
758  TRowIterator EndI(Partitions[i].GetVal2(), Table());
759  while (RowI < EndI) {
760  TInt RowId = RowI.GetRowIdx();
761  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
762  EdgeCol1[RowId] = RowId;
763  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
764  EdgeCol2[RowId] = RowId;
765  RowI++;
766  }
767  }
768  }
770 
771  Sw->Start(TStopwatch::Sort);
772  omp_set_num_threads(omp_get_max_threads());
773  #pragma omp parallel
774  {
775  #pragma omp single nowait
776  {
777  #ifndef GLib_WIN32
778  #pragma omp task untied shared(SrcCol1, EdgeCol1)
779  #endif
780  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
781  }
782  #pragma omp single nowait
783  {
784  #ifndef GLib_WIN32
785  #pragma omp task untied shared(EdgeCol2, DstCol2)
786  #endif
787  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
788  }
789  #ifndef GLib_WIN32
790  #pragma omp taskwait
791  #endif
792  }
793  Sw->Stop(TStopwatch::Sort);
794 
796  TInt NumThreads = omp_get_max_threads();
797  TInt PartSize = (NumRows/NumThreads);
798 
799  // Find the offset of all partitions, each of which contains a list of rows.
800  // Nodes from same sources or destinations are ensured to be kept within same partition.
801  TIntV SrcOffsets, DstOffsets;
802  SrcOffsets.Add(0);
803  for (TInt i = 1; i < NumThreads; i++) {
804  TInt CurrOffset = i * PartSize;
805  while (CurrOffset < (i+1) * PartSize &&
806  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
807  // ensure that rows from the same sources are grouped together
808  CurrOffset++;
809  }
810  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
811  }
812  SrcOffsets.Add(NumRows);
813 
814  DstOffsets.Add(0);
815  for (TInt i = 1; i < NumThreads; i++) {
816  TInt CurrOffset = i * PartSize;
817  while (CurrOffset < (i+1) * PartSize &&
818  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
819  // ensure that rows to the same destinations are grouped together
820  CurrOffset++;
821  }
822  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
823  }
824  DstOffsets.Add(NumRows);
825 
826  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
827  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
828 
829  // count the number of source nodes and destination nodes in each partition
830  TIntV SrcNodeCounts, DstNodeCounts;
831  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
832  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
833 
834  #pragma omp parallel for schedule(dynamic)
835  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
836  if (t < SrcPartCnt) {
837  TInt i = t;
838  if (SrcOffsets[i] != SrcOffsets[i+1]) {
839  SrcNodeCounts[i] = 1;
840  TInt CurrNode = SrcCol1[SrcOffsets[i]];
841  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
842  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
843  if (j < SrcOffsets[i+1]) {
844  SrcNodeCounts[i]++;
845  CurrNode = SrcCol1[j];
846  }
847  }
848  }
849  } else {
850  TInt i = t - SrcPartCnt;
851  if (DstOffsets[i] != DstOffsets[i+1]) {
852  DstNodeCounts[i] = 1;
853  TInt CurrNode = DstCol2[DstOffsets[i]];
854  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
855  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
856  if (j < DstOffsets[i+1]) {
857  DstNodeCounts[i]++;
858  CurrNode = DstCol2[j];
859  }
860  }
861  }
862  }
863  }
864 
865  TInt TotalSrcNodes = 0;
866  TIntV SrcIdOffsets;
867  for (int i = 0; i < SrcPartCnt; i++) {
868  SrcIdOffsets.Add(TotalSrcNodes);
869  TotalSrcNodes += SrcNodeCounts[i];
870  }
871 
872  TInt TotalDstNodes = 0;
873  TIntV DstIdOffsets;
874  for (int i = 0; i < DstPartCnt; i++) {
875  DstIdOffsets.Add(TotalDstNodes);
876  TotalDstNodes += DstNodeCounts[i];
877  }
878 
879  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
880 
881  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
882  TIntPrV SrcNodeIds, DstNodeIds;
883  #pragma omp parallel sections
884  {
885  #pragma omp section
886  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
887  #pragma omp section
888  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
889  }
890 
891  // Find the starting offset of each node (in both src and dst)
892  #pragma omp parallel for schedule(dynamic)
893  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
894  if (t < SrcPartCnt) {
895  TInt i = t;
896  if (SrcOffsets[i] != SrcOffsets[i+1]) {
897  TInt CurrNode = SrcCol1[SrcOffsets[i]];
898  TInt ThreadOffset = SrcIdOffsets[i];
899  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
900  TInt CurrCount = 1;
901  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
902  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
903  if (j < SrcOffsets[i+1]) {
904  CurrNode = SrcCol1[j];
905  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
906  CurrCount++;
907  }
908  }
909  }
910  } else {
911  TInt i = t - SrcPartCnt;
912  if (DstOffsets[i] != DstOffsets[i+1]) {
913  TInt CurrNode = DstCol2[DstOffsets[i]];
914  TInt ThreadOffset = DstIdOffsets[i];
915  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
916  TInt CurrCount = 1;
917  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
918  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
919  if (j < DstOffsets[i+1]) {
920  CurrNode = DstCol2[j];
921  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
922  CurrCount++;
923  }
924  }
925  }
926  }
927  }
928  Sw->Stop(TStopwatch::Group);
929 
931  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
932  TIntTrV Nodes;
933  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
934 
935  TInt i = 0, j = 0;
936  while (i < TotalSrcNodes && j < TotalDstNodes) {
937  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
938  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
939  i++;
940  j++;
941  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
942  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
943  i++;
944  } else {
945  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
946  j++;
947  }
948  }
949  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
950  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
952 
954  TInt NumNodes = Nodes.Len();
955  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
956 // NumThreads = omp_get_max_threads();
957 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
958 
959  TVec<TIntV> InVV(NumNodes);
960  TVec<TIntV> OutVV(NumNodes);
961 
962 // omp_set_num_threads(NumThreads);
963  #pragma omp parallel for schedule(static,100)
964  for (int m = 0; m < NumNodes; m++) {
965  //double startTr = omp_get_wtime();
966  //TIntV OutV, InV;
967  TInt n, i, j;
968  Nodes[m].GetVal(n, i, j);
969  if (i >= 0) {
970  TInt Offset = SrcNodeIds[i].GetVal2();
971  TInt Sz = EdgeCol1.Len()-Offset;
972  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
973  OutVV[m].Reserve(Sz);
974  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
975  }
976  if (j >= 0) {
977  TInt Offset = DstNodeIds[j].GetVal2();
978  TInt Sz = EdgeCol2.Len()-Offset;
979  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
980  InVV[m].Reserve(Sz);
981  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
982  }
983  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
984  }
985  Graph->SetNodes(NumNodes);
987 
989  omp_set_num_threads(omp_get_max_threads());
990  if (NodeType == atInt) {
991  #pragma omp parallel for schedule(static)
992  for (int i = 0; i < Partitions.Len(); i++) {
993  TRowIterator RowI(Partitions[i].GetVal1(), Table());
994  TRowIterator EndI(Partitions[i].GetVal2(), Table());
995  while (RowI < EndI) {
996  TInt RowId = RowI.GetRowIdx(); // EdgeId
997  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
998  TInt DstId = RowI.GetIntAttr(DstColIdx);
999  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1000  RowI++;
1001  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1002  TStr ColName = EdgeAttrV[ea_i];
1003  TAttrType T = Table->GetColType(ColName);
1004  TInt Index = Table->GetColIdx(ColName);
1005  switch (T) {
1006  case atInt:
1007  Graph->AddIntAttrDatE(RowId, Table->IntCols[Index][RowId], ColName);
1008  break;
1009  case atFlt:
1010  Graph->AddFltAttrDatE(RowId, Table->FltCols[Index][RowId], ColName);
1011  break;
1012  case atStr:
1013  Graph->AddStrAttrDatE(RowId, Table->GetStrValIdx(Index, RowId), ColName);
1014  break;
1015  }
1016  }
1017  if ((Table->SrcNodeAttrV).Len() > 0) {
1018  Table->AddNodeAttributes(SrcId, Table->SrcNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1019  }
1020 
1021  if ((Table->DstNodeAttrV).Len() > 0) {
1022  Table->AddNodeAttributes(SrcId, Table->DstNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1023  }
1024  }
1025  }
1026  }
1027  else if (NodeType == atStr) {
1028  #pragma omp parallel for schedule(static)
1029  for (int i = 0; i < Partitions.Len(); i++) {
1030  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1031  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1032  while (RowI < EndI) {
1033  TInt RowId = RowI.GetRowIdx(); // EdgeId
1034  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1035  TInt DstId = RowI.GetStrMapById(DstColIdx);
1036  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1037  RowI++;
1038  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1039  TStr ColName = EdgeAttrV[ea_i];
1040  TAttrType T = Table->GetColType(ColName);
1041  TInt Index = Table->GetColIdx(ColName);
1042  switch (T) {
1043  case atInt:
1044  Graph->AddIntAttrDatE(RowId, Table->IntCols[Index][RowId], ColName);
1045  break;
1046  case atFlt:
1047  Graph->AddFltAttrDatE(RowId, Table->FltCols[Index][RowId], ColName);
1048  break;
1049  case atStr:
1050  Graph->AddStrAttrDatE(RowId, Table->GetStrValIdx(Index, RowId), ColName);
1051  break;
1052  }
1053  }
1054  if ((Table->SrcNodeAttrV).Len() > 0) {
1055  Table->AddNodeAttributes(SrcId, Table->SrcNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1056  }
1057 
1058  if ((Table->DstNodeAttrV).Len() > 0) {
1059  Table->AddNodeAttributes(SrcId, Table->DstNodeAttrV, RowId, NodeIntAttrs, NodeFltAttrs, NodeStrAttrs);
1060  }
1061 
1062  }
1063  }
1064 
1065  }
1066 
1067  // aggregate node attributes and add to graph
1068  if ((Table->SrcNodeAttrV).Len() > 0 || (Table->DstNodeAttrV).Len() > 0) {
1069  for (typename PGraphMP::TObj::TNodeI NodeI = Graph->BegNI(); NodeI < Graph->EndNI(); NodeI++) {
1070  TInt NId = NodeI.GetId();
1071  if (NodeIntAttrs.IsKey(NId)) {
1072  TStrIntVH IntAttrVals = NodeIntAttrs.GetDat(NId);
1073  for (TStrIntVH::TIter it = IntAttrVals.BegI(); it < IntAttrVals.EndI(); it++) {
1074  TInt AttrVal = Table->AggregateVector<TInt>(it.GetDat(), AggrPolicy);
1075  Graph->AddIntAttrDatN(NId, AttrVal, it.GetKey());
1076  }
1077  }
1078  if (NodeFltAttrs.IsKey(NId)) {
1079  TStrFltVH FltAttrVals = NodeFltAttrs.GetDat(NId);
1080  for (TStrFltVH::TIter it = FltAttrVals.BegI(); it < FltAttrVals.EndI(); it++) {
1081  TFlt AttrVal = Table->AggregateVector<TFlt>(it.GetDat(), AggrPolicy);
1082  Graph->AddFltAttrDatN(NId, AttrVal, it.GetKey());
1083  }
1084  }
1085  if (NodeStrAttrs.IsKey(NId)) {
1086  TStrStrVH StrAttrVals = NodeStrAttrs.GetDat(NId);
1087  for (TStrStrVH::TIter it = StrAttrVals.BegI(); it < StrAttrVals.EndI(); it++) {
1088  TStr AttrVal = Table->AggregateVector<TStr>(it.GetDat(), AggrPolicy);
1089  Graph->AddStrAttrDatN(NId, AttrVal, it.GetKey());
1090  }
1091  }
1092  }
1093  }
1094 
1095 
1096  Graph->SetEdges(NumRows);
1098 
1099  // double endAdd = omp_get_wtime();
1100  // printf("Add time = %f\n", endAdd-endAlloc);
1101 
1102  return Graph;
1103 }
1104 
1106 template<class PGraphMP>
1107 PGraphMP ToNetworkMP(PTable Table,
1108  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
1109 {
1110  TStrV V;
1111  return ToNetworkMP<PGraphMP>(Table, SrcCol, DstCol, V,AggrPolicy);
1112 }
1113 
1114 
1115 
1117 template<class PGraphMP>
1118 inline PGraphMP ToNetworkMP2(PTable Table,
1119  const TStr& SrcCol, const TStr& DstCol,
1120  TStrV& SrcAttrV, TStrV& DstAttrV, TStrV& EdgeAttrV,
1121  TAttrAggr AggrPolicy) {
1123 
1125  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1126  const TInt DstColIdx = Table->GetColIdx(DstCol);
1127  const TInt NumRows = Table->NumValidRows;
1128 
1129  const TAttrType NodeType = Table->GetColType(SrcCol);
1130  Assert(NodeType == Table->GetColType(DstCol));
1131 
1132 
1133 
1134  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
1135 
1136  #pragma omp parallel sections num_threads(4)
1137  {
1138  #pragma omp section
1139  { SrcCol1.Reserve(NumRows, NumRows); }
1140  #pragma omp section
1141  { EdgeCol1.Reserve(NumRows, NumRows); }
1142  #pragma omp section
1143  { DstCol2.Reserve(NumRows, NumRows); }
1144  #pragma omp section
1145  { EdgeCol2.Reserve(NumRows, NumRows); }
1146  }
1149  TIntPrV Partitions;
1150 // int NThreads = omp_get_max_threads();
1151  const int NThreads = 40;
1152  Table->GetPartitionRanges(Partitions, NThreads);
1153  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
1154 
1155  // double endPartition = omp_get_wtime();
1156  // printf("Partition time = %f\n", endPartition-endResize);
1157 
1158  if (NodeType == atInt) {
1159  #pragma omp parallel for schedule(static)
1160  for (int i = 0; i < Partitions.Len(); i++) {
1161  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1162  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1163  while (RowI < EndI) {
1164  TInt RowId = RowI.GetRowIdx();
1165  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
1166  EdgeCol1[RowId] = RowId;
1167  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
1168  EdgeCol2[RowId] = RowId;
1169  RowI++;
1170  }
1171  }
1172  }
1173  else if (NodeType == atStr) {
1174  #pragma omp parallel for schedule(static)
1175  for (int i = 0; i < Partitions.Len(); i++) {
1176  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1177  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1178  while (RowI < EndI) {
1179  TInt RowId = RowI.GetRowIdx();
1180  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
1181  EdgeCol1[RowId] = RowId;
1182  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
1183  EdgeCol2[RowId] = RowId;
1184  RowI++;
1185  }
1186  }
1187 
1188  }
1189 
1190 // printf("NumRows = %d\n", NumRows.Val);
1191 // printf("NThreads = %d\n", NThreads);
1192 // for (int i = 0; i < Partitions.Len(); i++) {
1193 // printf("Partition %d %d->%d\n", i, Partitions[i].GetVal1().Val, Partitions[i].GetVal2().Val);
1194 // }
1195  int Parts[NThreads+1];
1196  for (int i = 0; i < NThreads; i++) {
1197  Parts[i] = NumRows.Val / NThreads * i;
1198  }
1199  Parts[NThreads] = NumRows;
1200 // for (int i = 0; i < NThreads+1; i++) {
1201 // printf("Parts[%d] = %d\n", i, Parts[i]);
1202 // }
1204 
1205  Sw->Start(TStopwatch::Sort);
1206  TInt ExtremePoints[4][NThreads];
1207  omp_set_num_threads(omp_get_max_threads());
1208  #pragma omp parallel
1209  {
1210  #pragma omp for schedule(static) nowait
1211  for (int i = 0; i < NThreads; i++) {
1212  TInt StartPos = Parts[i];
1213  TInt EndPos = Parts[i+1]-1;
1214  // TODO: Handle empty partition
1215  TTable::QSortKeyVal(SrcCol1, EdgeCol1, StartPos, EndPos);
1216  ExtremePoints[0][i] = SrcCol1[StartPos];
1217  ExtremePoints[2][i] = SrcCol1[EndPos];
1218  }
1219  #pragma omp for schedule(static) nowait
1220  for (int i = 0; i < NThreads; i++) {
1221  TInt StartPos = Parts[i];
1222  TInt EndPos = Parts[i+1]-1;
1223  // TODO: Handle empty partition
1224  TTable::QSortKeyVal(DstCol2, EdgeCol2, StartPos, EndPos);
1225  ExtremePoints[1][i] = DstCol2[StartPos];
1226  ExtremePoints[3][i] = DstCol2[EndPos];
1227  }
1228  }
1229 // for (int i = 0; i < NThreads; i++) {
1230 // printf("ExtremePoints[%d] = %d-%d -> %d-%d\n", i, ExtremePoints[0][i].Val, ExtremePoints[1][i].Val, ExtremePoints[2][i].Val, ExtremePoints[3][i].Val);
1231 // }
1232 
1233  // find min points
1234  TInt MinId(INT_MAX);
1235  for (int j = 0; j < 2; j++) {
1236  for (int i = 0; i < NThreads; i++) {
1237  if (MinId > ExtremePoints[j][i]) { MinId = ExtremePoints[j][i]; }
1238  }
1239  }
1240  TInt MaxId(-1);
1241  for (int j = 2; j < 4; j++) {
1242  for (int i = 0; i < NThreads; i++) {
1243  if (MaxId < ExtremePoints[j][i]) { MaxId = ExtremePoints[j][i]; }
1244  }
1245  }
1246 // printf("MinId = %d\n", MinId.Val);
1247 // printf("MaxId = %d\n", MaxId.Val);
1248  Sw->Stop(TStopwatch::Sort);
1249 
1250  Sw->Start(TStopwatch::Group);
1251 // const int NumCollectors = omp_get_max_threads();
1252  const int NumCollectors = 20;
1253  int Range = MaxId.Val - MinId.Val;
1254  TIntV IdRanges(NumCollectors+1);
1255  for (int j = 0; j < NumCollectors; j++) {
1256  IdRanges[j] = MinId + Range/NumCollectors*j;
1257  }
1258  IdRanges[NumCollectors] = MaxId+1;
1259 // for (int i = 0; i < NumCollectors+1; i++) {
1260 // printf("IdRanges[%d] = %d\n", i, IdRanges[i].Val);
1261 // }
1262 
1263  int SrcOffsets[NThreads][NumCollectors+1];
1264  #pragma omp parallel for schedule(static)
1265  for (int i = 0; i < NThreads; i++) {
1266  int CollectorId = 0;
1267  for (int j = Parts[i]; j < Parts[i+1]; j++) {
1268  while (SrcCol1[j] >= IdRanges[CollectorId]) {
1269  SrcOffsets[i][CollectorId++] = j;
1270  }
1271  }
1272  while (CollectorId <= NumCollectors) {
1273  SrcOffsets[i][CollectorId++] = Parts[i+1];
1274  }
1275  }
1276  int DstOffsets[NThreads][NumCollectors+1];
1277  #pragma omp parallel for schedule(static)
1278  for (int i = 0; i < NThreads; i++) {
1279  int CollectorId = 0;
1280  for (int j = Parts[i]; j < Parts[i+1]; j++) {
1281  while (DstCol2[j] >= IdRanges[CollectorId]) {
1282  DstOffsets[i][CollectorId++] = j;
1283  }
1284  }
1285  while (CollectorId <= NumCollectors) {
1286  DstOffsets[i][CollectorId++] = Parts[i+1];
1287  }
1288  }
1289 // for (int i = 0; i < NThreads; i++) {
1290 // for (int j = 0; j < NumCollectors+1; j++) {
1291 // printf("SrcOffsets[%d][%d] = %d\n", i, j, SrcOffsets[i][j]);
1292 // }
1293 // }
1294 // for (int i = 0; i < NThreads; i++) {
1295 // for (int j = 0; j < NumCollectors+1; j++) {
1296 // printf("DstOffsets[%d][%d] = %d\n", i, j, DstOffsets[i][j]);
1297 // }
1298 // }
1299 
1300  TIntV SrcCollectorOffsets(NumCollectors+1);
1301  SrcCollectorOffsets[0] = 0;
1302  for (int k = 0; k < NumCollectors; k++) {
1303  int SumOffset = 0;
1304  for (int i = 0; i < NThreads; i++) {
1305  SumOffset += SrcOffsets[i][k+1] - SrcOffsets[i][k];
1306  }
1307  SrcCollectorOffsets[k+1] = SrcCollectorOffsets[k] + SumOffset;
1308  }
1309  TIntV DstCollectorOffsets(NumCollectors+1);
1310  DstCollectorOffsets[0] = 0;
1311  for (int k = 0; k < NumCollectors; k++) {
1312  int SumOffset = 0;
1313  for (int i = 0; i < NThreads; i++) {
1314  SumOffset += DstOffsets[i][k+1] - DstOffsets[i][k];
1315  }
1316  DstCollectorOffsets[k+1] = DstCollectorOffsets[k] + SumOffset;
1317  }
1318 // for (int i = 0; i < NumCollectors+1; i++) {
1319 // printf("SrcCollectorOffsets[%d] = %d\n", i, SrcCollectorOffsets[i].Val);
1320 // }
1321 // for (int i = 0; i < NumCollectors+1; i++) {
1322 // printf("DstCollectorOffsets[%d] = %d\n", i, DstCollectorOffsets[i].Val);
1323 // }
1324 
1325  TIntV SrcCol3, EdgeCol3, EdgeCol4, DstCol4;
1326  #pragma omp parallel sections num_threads(4)
1327  {
1328  #pragma omp section
1329  { SrcCol3.Reserve(NumRows, NumRows); }
1330  #pragma omp section
1331  { EdgeCol3.Reserve(NumRows, NumRows); }
1332  #pragma omp section
1333  { DstCol4.Reserve(NumRows, NumRows); }
1334  #pragma omp section
1335  { EdgeCol4.Reserve(NumRows, NumRows); }
1336  }
1337 
1338  TIntV SrcNodeCounts(NumCollectors), DstNodeCounts(NumCollectors);
1339  #pragma omp parallel for schedule(static)
1340  for (int k = 0; k < NumCollectors; k++) {
1341  int ind = SrcCollectorOffsets[k];
1342  for (int i = 0; i < NThreads; i++) {
1343  for (int j = SrcOffsets[i][k]; j < SrcOffsets[i][k+1]; j++) {
1344  SrcCol3[ind] = SrcCol1[j];
1345  EdgeCol3[ind] = EdgeCol1[j];
1346  ind++;
1347  }
1348  }
1349  TTable::QSortKeyVal(SrcCol3, EdgeCol3, SrcCollectorOffsets[k], SrcCollectorOffsets[k+1]-1);
1350  int SrcCount = 0;
1351  if (SrcCollectorOffsets[k+1] > SrcCollectorOffsets[k]) {
1352  SrcCount = 1;
1353  for (int j = SrcCollectorOffsets[k]+1; j < SrcCollectorOffsets[k+1]; j++) {
1354  if (SrcCol3[j] != SrcCol3[j-1]) { SrcCount++; }
1355  }
1356  }
1357  SrcNodeCounts[k] = SrcCount;
1358 
1359  ind = DstCollectorOffsets[k];
1360  for (int i = 0; i < NThreads; i++) {
1361  for (int j = DstOffsets[i][k]; j < DstOffsets[i][k+1]; j++) {
1362  DstCol4[ind] = DstCol2[j];
1363  EdgeCol4[ind] = EdgeCol2[j];
1364  ind++;
1365  }
1366  }
1367  TTable::QSortKeyVal(DstCol4, EdgeCol4, DstCollectorOffsets[k], DstCollectorOffsets[k+1]-1);
1368  int DstCount = 0;
1369  if (DstCollectorOffsets[k+1] > DstCollectorOffsets[k]) {
1370  DstCount = 1;
1371  for (int j = DstCollectorOffsets[k]+1; j < DstCollectorOffsets[k+1]; j++) {
1372  if (DstCol4[j] != DstCol4[j-1]) { DstCount++; }
1373  }
1374  }
1375  DstNodeCounts[k] = DstCount;
1376  }
1377 
1378  TInt TotalSrcNodes = 0;
1379  TIntV SrcIdOffsets;
1380  for (int i = 0; i < NumCollectors; i++) {
1381  SrcIdOffsets.Add(TotalSrcNodes);
1382  TotalSrcNodes += SrcNodeCounts[i];
1383  }
1384 
1385 // printf("Sorted = %d - %d\n", SrcCol3.IsSorted(), DstCol4.IsSorted());
1386 // for (int i = 0; i < NumRows-1; i++) {
1387 // if (SrcCol3[i] > SrcCol3[i+1]) { printf("i=%d: %d %d\n", i, SrcCol3[i].Val, SrcCol3[i+1].Val); }
1388 // }
1389 // for (int i = 0; i < NumRows-1; i++) {
1390 // if (DstCol4[i] > DstCol4[i+1]) { printf("i=%d: %d %d\n", i, DstCol4[i].Val, DstCol4[i+1].Val); }
1391 // }
1392 
1393  TInt TotalDstNodes = 0;
1394  TIntV DstIdOffsets;
1395  for (int i = 0; i < NumCollectors; i++) {
1396  DstIdOffsets.Add(TotalDstNodes);
1397  TotalDstNodes += DstNodeCounts[i];
1398  }
1399 
1400  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
1401  TIntPrV SrcNodeIds, DstNodeIds;
1402  #pragma omp parallel sections
1403  {
1404  #pragma omp section
1405  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
1406  #pragma omp section
1407  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
1408  }
1409 
1410  // Find the starting offset of each node (in both src and dst)
1411  #pragma omp parallel for schedule(dynamic)
1412  for (int t = 0; t < 2*NumCollectors; t++) {
1413  if (t < NumCollectors) {
1414  TInt i = t;
1415  if (SrcCollectorOffsets[i] < SrcCollectorOffsets[i+1]) {
1416  TInt CurrNode = SrcCol3[SrcCollectorOffsets[i]];
1417  TInt ThreadOffset = SrcIdOffsets[i];
1418  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcCollectorOffsets[i]);
1419  TInt CurrCount = 1;
1420  for (TInt j = SrcCollectorOffsets[i]+1; j < SrcCollectorOffsets[i+1]; j++) {
1421  while (j < SrcCollectorOffsets[i+1] && SrcCol3[j] == CurrNode) { j++; }
1422  if (j < SrcCollectorOffsets[i+1]) {
1423  CurrNode = SrcCol3[j];
1424  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1425  CurrCount++;
1426  }
1427  }
1428  }
1429  } else {
1430  TInt i = t - NumCollectors;
1431  if (DstCollectorOffsets[i] < DstCollectorOffsets[i+1]) {
1432  TInt CurrNode = DstCol4[DstCollectorOffsets[i]];
1433  TInt ThreadOffset = DstIdOffsets[i];
1434  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstCollectorOffsets[i]);
1435  TInt CurrCount = 1;
1436  for (TInt j = DstCollectorOffsets[i]+1; j < DstCollectorOffsets[i+1]; j++) {
1437  while (j < DstCollectorOffsets[i+1] && DstCol4[j] == CurrNode) { j++; }
1438  if (j < DstCollectorOffsets[i+1]) {
1439  CurrNode = DstCol4[j];
1440  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1441  CurrCount++;
1442  }
1443  }
1444  }
1445  }
1446  }
1447  Sw->Stop(TStopwatch::Group);
1448 
1450  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
1451  TIntTrV Nodes;
1452  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
1453 
1454  TInt i = 0, j = 0;
1455  while (i < TotalSrcNodes && j < TotalDstNodes) {
1456  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
1457  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
1458  i++;
1459  j++;
1460  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
1461  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
1462  i++;
1463  } else {
1464  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
1465  j++;
1466  }
1467  }
1468  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
1469  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
1471 
1473  TInt NumNodes = Nodes.Len();
1474  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
1475 // NumThreads = omp_get_max_threads();
1476 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
1477 
1478  TVec<TIntV> InVV(NumNodes);
1479  TVec<TIntV> OutVV(NumNodes);
1480 
1481 // omp_set_num_threads(NumThreads);
1482  #pragma omp parallel for schedule(static,100)
1483  for (int m = 0; m < NumNodes; m++) {
1484  //double startTr = omp_get_wtime();
1485  //TIntV OutV, InV;
1486  TInt n, i, j;
1487  Nodes[m].GetVal(n, i, j);
1488  if (i >= 0) {
1489  TInt Offset = SrcNodeIds[i].GetVal2();
1490  TInt Sz = EdgeCol3.Len()-Offset;
1491  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
1492  OutVV[m].Reserve(Sz);
1493  OutVV[m].CopyUniqueFrom(EdgeCol3, Offset, Sz);
1494  }
1495  if (j >= 0) {
1496  TInt Offset = DstNodeIds[j].GetVal2();
1497  TInt Sz = EdgeCol4.Len()-Offset;
1498  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
1499  InVV[m].Reserve(Sz);
1500  InVV[m].CopyUniqueFrom(EdgeCol4, Offset, Sz);
1501  }
1502  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
1503  }
1504  Graph->SetNodes(NumNodes);
1506 
1508  omp_set_num_threads(omp_get_max_threads());
1509  if (NodeType == atInt) {
1510  #pragma omp parallel for schedule(static)
1511  for (int i = 0; i < Partitions.Len(); i++) {
1512  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1513  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1514  while (RowI < EndI) {
1515  TInt RowId = RowI.GetRowIdx(); // EdgeId
1516  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
1517  TInt DstId = RowI.GetIntAttr(DstColIdx);
1518  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1519  RowI++;
1520  }
1521  }
1522  }
1523  else if (NodeType == atStr) {
1524  #pragma omp parallel for schedule(static)
1525  for (int i = 0; i < Partitions.Len(); i++) {
1526  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1527  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1528  while (RowI < EndI) {
1529  TInt RowId = RowI.GetRowIdx(); // EdgeId
1530  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1531  TInt DstId = RowI.GetStrMapById(DstColIdx);
1532  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1533  RowI++;
1534  }
1535  }
1536  }
1537  Graph->SetEdges(NumRows);
1539 
1540  // double endAdd = omp_get_wtime();
1541  // printf("Add time = %f\n", endAdd-endAlloc);
1542 
1543  return Graph;
1544 }
1546 template<class PGraphMP>
1547 PGraphMP ToNetworkMP2(PTable Table,
1548  const TStr& SrcCol, const TStr& DstCol, TAttrAggr AggrPolicy)
1549 {
1550  TStrV V;
1551  return ToNetworkMP2<PGraphMP>(Table, SrcCol, DstCol, V, V, V, AggrPolicy);
1552 }
1553 #endif // GCC_ATOMIC
1554 
1555 
1557 int LoadModeNetToNet(PMMNet Graph, const TStr& Name, PTable Table, const TStr& NCol,
1558  TStrV& NodeAttrV);
1560 int LoadMode(TModeNet& Graph, PTable Table, const TStr& NCol,
1561  TStrV& NodeAttrV);
1563 int LoadCrossNetToNet(PMMNet Graph, const TStr& Mode1, const TStr& Mode2, const TStr& CrossName,
1564  PTable Table, const TStr& SrcCol, const TStr& DstCol, TStrV& EdgeAttrV);
1566 int LoadCrossNet(TCrossNet& Graph, PTable Table, const TStr& SrcCol, const TStr& DstCol,
1567  TStrV& EdgeAttrV);
1568 
1569 
1571 template<class PGraph>
1572 PGraph ToNetwork(PTable Table,
1573  const TStr& SrcCol, const TStr& DstCol,
1574  TStrV& EdgeAttrV,
1575  TAttrAggr AggrPolicy) {
1576  PGraph Graph = PGraph::TObj::New();
1577 
1578  const TAttrType NodeType = Table->GetColType(SrcCol);
1579  Assert(NodeType == Table->GetColType(DstCol));
1580  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1581  const TInt DstColIdx = Table->GetColIdx(DstCol);
1582 
1583  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
1584  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
1585  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
1586 
1587  // node values - i.e. the unique values of src/dst col
1588  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
1589  THash<TFlt, TInt> FltNodeVals;
1590 
1591  // make single pass over all rows in the table
1592  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
1593  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
1594  continue;
1595  }
1596 
1597  // add src and dst nodes to graph if they are not seen earlier
1598  TInt SVal, DVal;
1599  if (NodeType == atFlt) {
1600  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
1601  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
1602  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
1603  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
1604  }
1605  else if (NodeType == atInt || NodeType == atStr) {
1606  if (NodeType == atInt) {
1607  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
1608  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
1609  }
1610  else {
1611  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
1612  // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
1613  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
1614  // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
1615  }
1616  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
1617  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
1618  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
1619  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
1620  }
1621 
1622  // add edge and edge attributes
1623  Graph->AddEdge(SVal, DVal, CurrRowIdx);
1624 
1625  // Aggregate edge attributes and add to graph
1626  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
1627  TStr ColName = EdgeAttrV[i];
1628  TAttrType T = Table->GetColType(ColName);
1629  TInt Index = Table->GetColIdx(ColName);
1630  switch (T) {
1631  case atInt:
1632  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
1633  break;
1634  case atFlt:
1635  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
1636  break;
1637  case atStr:
1638  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrValIdx(Index, CurrRowIdx), ColName);
1639  break;
1640  }
1641  }
1642  }
1643  return Graph;
1644 
1645 }
1646 
1647 #ifdef GCC_ATOMIC
1648 
1650 template<class PGraphMP>
1651 inline PGraphMP ToNetworkMP(PTable Table,
1652  const TStr& SrcCol, const TStr& DstCol,
1653  TStrV& EdgeAttrV,
1654  TAttrAggr AggrPolicy) {
1656 
1658  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
1659  const TInt DstColIdx = Table->GetColIdx(DstCol);
1660  const TInt NumRows = Table->GetNumValidRows();
1661 
1662  const TAttrType NodeType = Table->GetColType(SrcCol);
1663  Assert(NodeType == Table->GetColType(DstCol));
1664 
1665  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
1666 
1667  THash<TInt, TStrIntVH> NodeIntAttrs;
1668  THash<TInt, TStrFltVH> NodeFltAttrs;
1669  THash<TInt, TStrStrVH> NodeStrAttrs;
1670 
1671  #pragma omp parallel sections num_threads(4)
1672  {
1673  #pragma omp section
1674  { SrcCol1.Reserve(NumRows, NumRows); }
1675  #pragma omp section
1676  { EdgeCol1.Reserve(NumRows, NumRows); }
1677  #pragma omp section
1678  { DstCol2.Reserve(NumRows, NumRows); }
1679  #pragma omp section
1680  { EdgeCol2.Reserve(NumRows, NumRows); }
1681  }
1683 
1685  TIntPrV Partitions;
1686  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
1687  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
1688 
1689 
1690  // double endPartition = omp_get_wtime();
1691  // printf("Partition time = %f\n", endPartition-endResize);
1692 
1693  omp_set_num_threads(omp_get_max_threads());
1694  if (NodeType == atInt) {
1695  #pragma omp parallel for schedule(static)
1696  for (int i = 0; i < Partitions.Len(); i++) {
1697  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1698  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1699  while (RowI < EndI) {
1700  TInt RowId = RowI.GetRowIdx();
1701  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
1702  EdgeCol1[RowId] = RowId;
1703  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
1704  EdgeCol2[RowId] = RowId;
1705  RowI++;
1706  }
1707  }
1708  }
1709  else if (NodeType == atStr) {
1710  #pragma omp parallel for schedule(static)
1711  for (int i = 0; i < Partitions.Len(); i++) {
1712  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1713  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1714  while (RowI < EndI) {
1715  TInt RowId = RowI.GetRowIdx();
1716  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
1717  EdgeCol1[RowId] = RowId;
1718  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
1719  EdgeCol2[RowId] = RowId;
1720  RowI++;
1721  }
1722  }
1723  }
1725 
1726  Sw->Start(TStopwatch::Sort);
1727  omp_set_num_threads(omp_get_max_threads());
1728  #pragma omp parallel
1729  {
1730  #pragma omp single nowait
1731  {
1732  #ifndef GLib_WIN32
1733  #pragma omp task untied shared(SrcCol1, EdgeCol1)
1734  #endif
1735  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
1736  }
1737  #pragma omp single nowait
1738  {
1739  #ifndef GLib_WIN32
1740  #pragma omp task untied shared(EdgeCol2, DstCol2)
1741  #endif
1742  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
1743  }
1744  #ifndef GLib_WIN32
1745  #pragma omp taskwait
1746  #endif
1747  }
1748  Sw->Stop(TStopwatch::Sort);
1749 
1750  Sw->Start(TStopwatch::Group);
1751  TInt NumThreads = omp_get_max_threads();
1752  TInt PartSize = (NumRows/NumThreads);
1753 
1754  // Find the offset of all partitions, each of which contains a list of rows.
1755  // Nodes from same sources or destinations are ensured to be kept within same partition.
1756  TIntV SrcOffsets, DstOffsets;
1757  SrcOffsets.Add(0);
1758  for (TInt i = 1; i < NumThreads; i++) {
1759  TInt CurrOffset = i * PartSize;
1760  while (CurrOffset < (i+1) * PartSize &&
1761  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
1762  // ensure that rows from the same sources are grouped together
1763  CurrOffset++;
1764  }
1765  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
1766  }
1767  SrcOffsets.Add(NumRows);
1768 
1769  DstOffsets.Add(0);
1770  for (TInt i = 1; i < NumThreads; i++) {
1771  TInt CurrOffset = i * PartSize;
1772  while (CurrOffset < (i+1) * PartSize &&
1773  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
1774  // ensure that rows to the same destinations are grouped together
1775  CurrOffset++;
1776  }
1777  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
1778  }
1779  DstOffsets.Add(NumRows);
1780 
1781  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
1782  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
1783 
1784  // count the number of source nodes and destination nodes in each partition
1785  TIntV SrcNodeCounts, DstNodeCounts;
1786  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
1787  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
1788 
1789  #pragma omp parallel for schedule(dynamic)
1790  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
1791  if (t < SrcPartCnt) {
1792  TInt i = t;
1793  if (SrcOffsets[i] != SrcOffsets[i+1]) {
1794  SrcNodeCounts[i] = 1;
1795  TInt CurrNode = SrcCol1[SrcOffsets[i]];
1796  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
1797  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
1798  if (j < SrcOffsets[i+1]) {
1799  SrcNodeCounts[i]++;
1800  CurrNode = SrcCol1[j];
1801  }
1802  }
1803  }
1804  } else {
1805  TInt i = t - SrcPartCnt;
1806  if (DstOffsets[i] != DstOffsets[i+1]) {
1807  DstNodeCounts[i] = 1;
1808  TInt CurrNode = DstCol2[DstOffsets[i]];
1809  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
1810  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
1811  if (j < DstOffsets[i+1]) {
1812  DstNodeCounts[i]++;
1813  CurrNode = DstCol2[j];
1814  }
1815  }
1816  }
1817  }
1818  }
1819 
1820  TInt TotalSrcNodes = 0;
1821  TIntV SrcIdOffsets;
1822  for (int i = 0; i < SrcPartCnt; i++) {
1823  SrcIdOffsets.Add(TotalSrcNodes);
1824  TotalSrcNodes += SrcNodeCounts[i];
1825  }
1826 
1827  TInt TotalDstNodes = 0;
1828  TIntV DstIdOffsets;
1829  for (int i = 0; i < DstPartCnt; i++) {
1830  DstIdOffsets.Add(TotalDstNodes);
1831  TotalDstNodes += DstNodeCounts[i];
1832  }
1833 
1834  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
1835 
1836  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
1837  TIntPrV SrcNodeIds, DstNodeIds;
1838  #pragma omp parallel sections
1839  {
1840  #pragma omp section
1841  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
1842  #pragma omp section
1843  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
1844  }
1845 
1846  // Find the starting offset of each node (in both src and dst)
1847  #pragma omp parallel for schedule(dynamic)
1848  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
1849  if (t < SrcPartCnt) {
1850  TInt i = t;
1851  if (SrcOffsets[i] != SrcOffsets[i+1]) {
1852  TInt CurrNode = SrcCol1[SrcOffsets[i]];
1853  TInt ThreadOffset = SrcIdOffsets[i];
1854  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
1855  TInt CurrCount = 1;
1856  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
1857  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
1858  if (j < SrcOffsets[i+1]) {
1859  CurrNode = SrcCol1[j];
1860  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1861  CurrCount++;
1862  }
1863  }
1864  }
1865  } else {
1866  TInt i = t - SrcPartCnt;
1867  if (DstOffsets[i] != DstOffsets[i+1]) {
1868  TInt CurrNode = DstCol2[DstOffsets[i]];
1869  TInt ThreadOffset = DstIdOffsets[i];
1870  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
1871  TInt CurrCount = 1;
1872  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
1873  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
1874  if (j < DstOffsets[i+1]) {
1875  CurrNode = DstCol2[j];
1876  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
1877  CurrCount++;
1878  }
1879  }
1880  }
1881  }
1882  }
1883  Sw->Stop(TStopwatch::Group);
1884 
1886  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
1887  TIntTrV Nodes;
1888  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
1889 
1890  TInt i = 0, j = 0;
1891  while (i < TotalSrcNodes && j < TotalDstNodes) {
1892  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
1893  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
1894  i++;
1895  j++;
1896  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
1897  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
1898  i++;
1899  } else {
1900  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
1901  j++;
1902  }
1903  }
1904  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
1905  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
1907 
1909  TInt NumNodes = Nodes.Len();
1910  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
1911 // NumThreads = omp_get_max_threads();
1912 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
1913 
1914  TVec<TIntV> InVV(NumNodes);
1915  TVec<TIntV> OutVV(NumNodes);
1916 
1917 // omp_set_num_threads(NumThreads);
1918  #pragma omp parallel for schedule(static,100)
1919  for (int m = 0; m < NumNodes; m++) {
1920  //double startTr = omp_get_wtime();
1921  //TIntV OutV, InV;
1922  TInt n, i, j;
1923  Nodes[m].GetVal(n, i, j);
1924  if (i >= 0) {
1925  TInt Offset = SrcNodeIds[i].GetVal2();
1926  TInt Sz = EdgeCol1.Len()-Offset;
1927  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
1928  OutVV[m].Reserve(Sz);
1929  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
1930  }
1931  if (j >= 0) {
1932  TInt Offset = DstNodeIds[j].GetVal2();
1933  TInt Sz = EdgeCol2.Len()-Offset;
1934  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
1935  InVV[m].Reserve(Sz);
1936  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
1937  }
1938  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
1939  }
1940  Graph->SetNodes(NumNodes);
1942 
1944  omp_set_num_threads(omp_get_max_threads());
1945  if (NodeType == atInt) {
1946  #pragma omp parallel for schedule(static)
1947  for (int i = 0; i < Partitions.Len(); i++) {
1948  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1949  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1950  while (RowI < EndI) {
1951  TInt RowId = RowI.GetRowIdx(); // EdgeId
1952  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
1953  TInt DstId = RowI.GetIntAttr(DstColIdx);
1954  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1955  RowI++;
1956  }
1957  }
1958  }
1959  else if (NodeType == atStr) {
1960  #pragma omp parallel for schedule(static)
1961  for (int i = 0; i < Partitions.Len(); i++) {
1962  TRowIterator RowI(Partitions[i].GetVal1(), Table());
1963  TRowIterator EndI(Partitions[i].GetVal2(), Table());
1964  while (RowI < EndI) {
1965  TInt RowId = RowI.GetRowIdx(); // EdgeId
1966  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
1967  TInt DstId = RowI.GetStrMapById(DstColIdx);
1968  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
1969  RowI++;
1970  }
1971  }
1972 
1973  }
1974 
1975  Graph->SetEdges(NumRows);
1976  Graph->SetMxEId(NumRows);
1978 
1979  // make single pass over all rows in the table to add attributes
1980  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
1981  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
1982  continue;
1983  }
1984  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
1985  TStr ColName = EdgeAttrV[ea_i];
1986  TAttrType T = Table->GetColType(ColName);
1987  TInt Index = Table->GetColIdx(ColName);
1988  switch (T) {
1989  case atInt:
1990  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
1991  break;
1992  case atFlt:
1993  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
1994  break;
1995  case atStr:
1996  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrValIdx(Index, CurrRowIdx), ColName);
1997  break;
1998  }
1999  }
2000  }
2001  // double endAdd = omp_get_wtime();
2002  // printf("Add time = %f\n", endAdd-endAlloc);
2003 
2004  return Graph;
2005 }
2006 #endif // GCC_ATOMIC
2007 
2009 template<class PGraph>
2010 PGraph ToNetwork(PTable Table,
2011  const TStr& SrcCol, const TStr& DstCol,
2012  TStrV& EdgeAttrV, PTable NodeTable, const TStr& NodeCol, TStrV& NodeAttrV,
2013  TAttrAggr AggrPolicy) {
2014  PGraph Graph = PGraph::TObj::New();
2015 
2016  const TAttrType NodeType = Table->GetColType(SrcCol);
2017  Assert(NodeType == Table->GetColType(DstCol));
2018  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
2019  const TInt DstColIdx = Table->GetColIdx(DstCol);
2020 
2021 
2022  const TAttrType NodeTypeN = NodeTable->GetColType(NodeCol);
2023  const TInt NodeColIdx = NodeTable->GetColIdx(NodeCol);
2024  THash<TInt, TStrIntVH> NodeIntAttrs;
2025  THash<TInt, TStrFltVH> NodeFltAttrs;
2026  THash<TInt, TStrStrVH> NodeStrAttrs;
2027 
2028 
2029  //Table->AddGraphAttributeV(SrcAttrV, false, true, false);
2030  //Table->AddGraphAttributeV(DstAttrV, false, false, true);
2031  //Table->AddGraphAttributeV(EdgeAttrV, true, false, true);
2032 
2033  // node values - i.e. the unique values of src/dst col
2034  //THashSet<TInt> IntNodeVals; // for both int and string node attr types.
2035  THash<TFlt, TInt> FltNodeVals;
2036 
2037  // make single pass over all rows in the table
2038  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
2039  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
2040  continue;
2041  }
2042 
2043  // add src and dst nodes to graph if they are not seen earlier
2044  TInt SVal, DVal;
2045  if (NodeType == atFlt) {
2046  TFlt FSVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
2047  SVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FSVal);
2048  TFlt FDVal = (Table->FltCols)[SrcColIdx][CurrRowIdx];
2049  DVal = Table->CheckAndAddFltNode(Graph, FltNodeVals, FDVal);
2050  }
2051  else if (NodeType == atInt || NodeType == atStr) {
2052  if (NodeType == atInt) {
2053  SVal = (Table->IntCols)[SrcColIdx][CurrRowIdx];
2054  DVal = (Table->IntCols)[DstColIdx][CurrRowIdx];
2055  }
2056  else {
2057  SVal = (Table->StrColMaps)[SrcColIdx][CurrRowIdx];
2058  // if (strlen(Table->GetContextKey(SVal)) == 0) { continue; } //illegal value
2059  DVal = (Table->StrColMaps)[DstColIdx][CurrRowIdx];
2060  // if (strlen(Table->GetContextKey(DVal)) == 0) { continue; } //illegal value
2061  }
2062  if (!Graph->IsNode(SVal)) {Graph->AddNode(SVal); }
2063  if (!Graph->IsNode(DVal)) {Graph->AddNode(DVal); }
2064  //CheckAndAddIntNode(Graph, IntNodeVals, SVal);
2065  //CheckAndAddIntNode(Graph, IntNodeVals, DVal);
2066  }
2067 
2068  // add edge and edge attributes
2069  Graph->AddEdge(SVal, DVal, CurrRowIdx);
2070 
2071  // Aggregate edge attributes and add to graph
2072  for (TInt i = 0; i < EdgeAttrV.Len(); i++) {
2073  TStr ColName = EdgeAttrV[i];
2074  TAttrType T = Table->GetColType(ColName);
2075  TInt Index = Table->GetColIdx(ColName);
2076  switch (T) {
2077  case atInt:
2078  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
2079  break;
2080  case atFlt:
2081  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
2082  break;
2083  case atStr:
2084  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrValIdx(Index, CurrRowIdx), ColName);
2085  break;
2086  }
2087  }
2088  }
2089 
2090 
2091  //Add node attribtes
2092  if (NodeAttrV.Len() > 0) {
2093  for (int CurrRowIdx = 0; CurrRowIdx < (NodeTable->Next).Len(); CurrRowIdx++) {
2094  if ((NodeTable->Next)[CurrRowIdx] == NodeTable->Invalid) {
2095  continue;
2096  }
2097  TInt NId;
2098  if (NodeTypeN == atInt) {
2099  NId = (NodeTable->IntCols)[NodeColIdx][CurrRowIdx];
2100  }
2101  else if (NodeTypeN == atStr){
2102  NId = (NodeTable->StrColMaps)[NodeColIdx][CurrRowIdx];
2103  }
2104  for (TInt i = 0; i < NodeAttrV.Len(); i++) {
2105  TStr ColName = NodeAttrV[i];
2106  TAttrType T = NodeTable->GetColType(ColName);
2107  TInt Index = NodeTable->GetColIdx(ColName);
2108  switch (T) {
2109  case atInt:
2110  Graph->AddIntAttrDatN(NId, NodeTable->IntCols[Index][CurrRowIdx], ColName);
2111  break;
2112  case atFlt:
2113  Graph->AddFltAttrDatN(NId, NodeTable->FltCols[Index][CurrRowIdx], ColName);
2114  break;
2115  case atStr:
2116  Graph->AddStrAttrDatN(NId, NodeTable->GetStrValIdx(Index, CurrRowIdx), ColName);
2117  break;
2118  }
2119  }
2120  }
2121  }
2122 
2123  return Graph;
2124 
2125 }
2126 
2127 #ifdef GCC_ATOMIC
2128 
2130 template<class PGraphMP>
2131 inline PGraphMP ToNetworkMP(PTable Table,
2132  const TStr& SrcCol, const TStr& DstCol,
2133  TStrV& EdgeAttrV, PTable NodeTable, const TStr& NodeCol, TStrV& NodeAttrV,
2134  TAttrAggr AggrPolicy) {
2136 
2138  const TInt SrcColIdx = Table->GetColIdx(SrcCol);
2139  const TInt DstColIdx = Table->GetColIdx(DstCol);
2140  const TInt NumRows = Table->GetNumValidRows();
2141 
2142  const TAttrType NodeType = Table->GetColType(SrcCol);
2143  Assert(NodeType == Table->GetColType(DstCol));
2144 
2145 
2146  TIntV SrcCol1, EdgeCol1, EdgeCol2, DstCol2;
2147 
2148  const TAttrType NodeTypeN = NodeTable->GetColType(NodeCol);
2149  const TInt NodeColIdx = NodeTable->GetColIdx(NodeCol);
2150  THash<TInt, TStrIntVH> NodeIntAttrs;
2151  THash<TInt, TStrFltVH> NodeFltAttrs;
2152  THash<TInt, TStrStrVH> NodeStrAttrs;
2153 
2154  #pragma omp parallel sections num_threads(4)
2155  {
2156  #pragma omp section
2157  { SrcCol1.Reserve(NumRows, NumRows); }
2158  #pragma omp section
2159  { EdgeCol1.Reserve(NumRows, NumRows); }
2160  #pragma omp section
2161  { DstCol2.Reserve(NumRows, NumRows); }
2162  #pragma omp section
2163  { EdgeCol2.Reserve(NumRows, NumRows); }
2164  }
2166 
2168  TIntPrV Partitions;
2169  Table->GetPartitionRanges(Partitions, omp_get_max_threads());
2170  TInt PartitionSize = Partitions[0].GetVal2()-Partitions[0].GetVal1()+1;
2171 
2172  // double endPartition = omp_get_wtime();
2173  // printf("Partition time = %f\n", endPartition-endResize);
2174 
2175  omp_set_num_threads(omp_get_max_threads());
2176  if (NodeType == atInt) {
2177  #pragma omp parallel for schedule(static)
2178  for (int i = 0; i < Partitions.Len(); i++) {
2179  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2180  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2181  while (RowI < EndI) {
2182  TInt RowId = RowI.GetRowIdx();
2183  SrcCol1[RowId] = RowI.GetIntAttr(SrcColIdx);
2184  EdgeCol1[RowId] = RowId;
2185  DstCol2[RowId] = RowI.GetIntAttr(DstColIdx);
2186  EdgeCol2[RowId] = RowId;
2187  RowI++;
2188  }
2189  }
2190  }
2191  else if (NodeType == atStr) {
2192  #pragma omp parallel for schedule(static)
2193  for (int i = 0; i < Partitions.Len(); i++) {
2194  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2195  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2196  while (RowI < EndI) {
2197  TInt RowId = RowI.GetRowIdx();
2198  SrcCol1[RowId] = RowI.GetStrMapById(SrcColIdx);
2199  EdgeCol1[RowId] = RowId;
2200  DstCol2[RowId] = RowI.GetStrMapById(DstColIdx);
2201  EdgeCol2[RowId] = RowId;
2202  RowI++;
2203  }
2204  }
2205  }
2207 
2208  Sw->Start(TStopwatch::Sort);
2209  omp_set_num_threads(omp_get_max_threads());
2210  #pragma omp parallel
2211  {
2212  #pragma omp single nowait
2213  {
2214  #ifndef GLib_WIN32
2215  #pragma omp task untied shared(SrcCol1, EdgeCol1)
2216  #endif
2217  { TTable::QSortKeyVal(SrcCol1, EdgeCol1, 0, NumRows-1); }
2218  }
2219  #pragma omp single nowait
2220  {
2221  #ifndef GLib_WIN32
2222  #pragma omp task untied shared(EdgeCol2, DstCol2)
2223  #endif
2224  { TTable::QSortKeyVal(DstCol2, EdgeCol2, 0, NumRows-1); }
2225  }
2226  #ifndef GLib_WIN32
2227  #pragma omp taskwait
2228  #endif
2229  }
2230  Sw->Stop(TStopwatch::Sort);
2231 
2232  Sw->Start(TStopwatch::Group);
2233  TInt NumThreads = omp_get_max_threads();
2234  TInt PartSize = (NumRows/NumThreads);
2235 
2236  // Find the offset of all partitions, each of which contains a list of rows.
2237  // Nodes from same sources or destinations are ensured to be kept within same partition.
2238  TIntV SrcOffsets, DstOffsets;
2239  SrcOffsets.Add(0);
2240  for (TInt i = 1; i < NumThreads; i++) {
2241  TInt CurrOffset = i * PartSize;
2242  while (CurrOffset < (i+1) * PartSize &&
2243  SrcCol1[CurrOffset-1] == SrcCol1[CurrOffset]) {
2244  // ensure that rows from the same sources are grouped together
2245  CurrOffset++;
2246  }
2247  if (CurrOffset < (i+1) * PartSize) { SrcOffsets.Add(CurrOffset); }
2248  }
2249  SrcOffsets.Add(NumRows);
2250 
2251  DstOffsets.Add(0);
2252  for (TInt i = 1; i < NumThreads; i++) {
2253  TInt CurrOffset = i * PartSize;
2254  while (CurrOffset < (i+1) * PartSize &&
2255  DstCol2[CurrOffset-1] == DstCol2[CurrOffset]) {
2256  // ensure that rows to the same destinations are grouped together
2257  CurrOffset++;
2258  }
2259  if (CurrOffset < (i+1) * PartSize) { DstOffsets.Add(CurrOffset); }
2260  }
2261  DstOffsets.Add(NumRows);
2262 
2263  TInt SrcPartCnt = SrcOffsets.Len()-1; // number of partitions
2264  TInt DstPartCnt = DstOffsets.Len()-1; // number of partitions
2265 
2266  // count the number of source nodes and destination nodes in each partition
2267  TIntV SrcNodeCounts, DstNodeCounts;
2268  SrcNodeCounts.Reserve(SrcPartCnt, SrcPartCnt);
2269  DstNodeCounts.Reserve(DstPartCnt, DstPartCnt);
2270 
2271  #pragma omp parallel for schedule(dynamic)
2272  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
2273  if (t < SrcPartCnt) {
2274  TInt i = t;
2275  if (SrcOffsets[i] != SrcOffsets[i+1]) {
2276  SrcNodeCounts[i] = 1;
2277  TInt CurrNode = SrcCol1[SrcOffsets[i]];
2278  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
2279  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
2280  if (j < SrcOffsets[i+1]) {
2281  SrcNodeCounts[i]++;
2282  CurrNode = SrcCol1[j];
2283  }
2284  }
2285  }
2286  } else {
2287  TInt i = t - SrcPartCnt;
2288  if (DstOffsets[i] != DstOffsets[i+1]) {
2289  DstNodeCounts[i] = 1;
2290  TInt CurrNode = DstCol2[DstOffsets[i]];
2291  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
2292  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
2293  if (j < DstOffsets[i+1]) {
2294  DstNodeCounts[i]++;
2295  CurrNode = DstCol2[j];
2296  }
2297  }
2298  }
2299  }
2300  }
2301 
2302  TInt TotalSrcNodes = 0;
2303  TIntV SrcIdOffsets;
2304  for (int i = 0; i < SrcPartCnt; i++) {
2305  SrcIdOffsets.Add(TotalSrcNodes);
2306  TotalSrcNodes += SrcNodeCounts[i];
2307  }
2308 
2309  TInt TotalDstNodes = 0;
2310  TIntV DstIdOffsets;
2311  for (int i = 0; i < DstPartCnt; i++) {
2312  DstIdOffsets.Add(TotalDstNodes);
2313  TotalDstNodes += DstNodeCounts[i];
2314  }
2315 
2316  // printf("Total Src = %d, Total Dst = %d\n", TotalSrcNodes.Val, TotalDstNodes.Val);
2317 
2318  // find vector of (node_id, start_offset) where start_offset is the index of the first row with node_id
2319  TIntPrV SrcNodeIds, DstNodeIds;
2320  #pragma omp parallel sections
2321  {
2322  #pragma omp section
2323  { SrcNodeIds.Reserve(TotalSrcNodes, TotalSrcNodes); }
2324  #pragma omp section
2325  { DstNodeIds.Reserve(TotalDstNodes, TotalDstNodes); }
2326  }
2327 
2328  // Find the starting offset of each node (in both src and dst)
2329  #pragma omp parallel for schedule(dynamic)
2330  for (int t = 0; t < SrcPartCnt+DstPartCnt; t++) {
2331  if (t < SrcPartCnt) {
2332  TInt i = t;
2333  if (SrcOffsets[i] != SrcOffsets[i+1]) {
2334  TInt CurrNode = SrcCol1[SrcOffsets[i]];
2335  TInt ThreadOffset = SrcIdOffsets[i];
2336  SrcNodeIds[ThreadOffset] = TIntPr(CurrNode, SrcOffsets[i]);
2337  TInt CurrCount = 1;
2338  for (TInt j = SrcOffsets[i]+1; j < SrcOffsets[i+1]; j++) {
2339  while (j < SrcOffsets[i+1] && SrcCol1[j] == CurrNode) { j++; }
2340  if (j < SrcOffsets[i+1]) {
2341  CurrNode = SrcCol1[j];
2342  SrcNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
2343  CurrCount++;
2344  }
2345  }
2346  }
2347  } else {
2348  TInt i = t - SrcPartCnt;
2349  if (DstOffsets[i] != DstOffsets[i+1]) {
2350  TInt CurrNode = DstCol2[DstOffsets[i]];
2351  TInt ThreadOffset = DstIdOffsets[i];
2352  DstNodeIds[ThreadOffset] = TIntPr(CurrNode, DstOffsets[i]);
2353  TInt CurrCount = 1;
2354  for (TInt j = DstOffsets[i]+1; j < DstOffsets[i+1]; j++) {
2355  while (j < DstOffsets[i+1] && DstCol2[j] == CurrNode) { j++; }
2356  if (j < DstOffsets[i+1]) {
2357  CurrNode = DstCol2[j];
2358  DstNodeIds[ThreadOffset+CurrCount] = TIntPr(CurrNode, j);
2359  CurrCount++;
2360  }
2361  }
2362  }
2363  }
2364  }
2365  Sw->Stop(TStopwatch::Group);
2366 
2368  // Find the combined neighborhood (both out-neighbors and in-neighbors) of each node
2369  TIntTrV Nodes;
2370  Nodes.Reserve(TotalSrcNodes+TotalDstNodes);
2371 
2372  TInt i = 0, j = 0;
2373  while (i < TotalSrcNodes && j < TotalDstNodes) {
2374  if (SrcNodeIds[i].Val1 == DstNodeIds[j].Val1) {
2375  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, j));
2376  i++;
2377  j++;
2378  } else if (SrcNodeIds[i].Val1 < DstNodeIds[j].Val1) {
2379  Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1));
2380  i++;
2381  } else {
2382  Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j));
2383  j++;
2384  }
2385  }
2386  for (; i < TotalSrcNodes; i++) { Nodes.Add(TIntTr(SrcNodeIds[i].Val1, i, -1)); }
2387  for (; j < TotalDstNodes; j++) { Nodes.Add(TIntTr(DstNodeIds[j].Val1, -1, j)); }
2389 
2391  TInt NumNodes = Nodes.Len();
2392  PGraphMP Graph = PGraphMP::TObj::New(NumNodes, NumRows);
2393 // NumThreads = omp_get_max_threads();
2394 // int Delta = (NumNodes+NumThreads-1)/NumThreads;
2395 
2396  TVec<TIntV> InVV(NumNodes);
2397  TVec<TIntV> OutVV(NumNodes);
2398 
2399 // omp_set_num_threads(NumThreads);
2400  #pragma omp parallel for schedule(static,100)
2401  for (int m = 0; m < NumNodes; m++) {
2402  //double startTr = omp_get_wtime();
2403  //TIntV OutV, InV;
2404  TInt n, i, j;
2405  Nodes[m].GetVal(n, i, j);
2406  if (i >= 0) {
2407  TInt Offset = SrcNodeIds[i].GetVal2();
2408  TInt Sz = EdgeCol1.Len()-Offset;
2409  if (i < SrcNodeIds.Len()-1) { Sz = SrcNodeIds[i+1].GetVal2()-Offset; }
2410  OutVV[m].Reserve(Sz);
2411  OutVV[m].CopyUniqueFrom(EdgeCol1, Offset, Sz);
2412  }
2413  if (j >= 0) {
2414  TInt Offset = DstNodeIds[j].GetVal2();
2415  TInt Sz = EdgeCol2.Len()-Offset;
2416  if (j < DstNodeIds.Len()-1) { Sz = DstNodeIds[j+1].GetVal2()-Offset; }
2417  InVV[m].Reserve(Sz);
2418  InVV[m].CopyUniqueFrom(EdgeCol2, Offset, Sz);
2419  }
2420  Graph->AddNodeWithEdges(n, InVV[m], OutVV[m]);
2421  }
2422  Graph->SetNodes(NumNodes);
2424 
2426  omp_set_num_threads(omp_get_max_threads());
2427  if (NodeType == atInt) {
2428  #pragma omp parallel for schedule(static)
2429  for (int i = 0; i < Partitions.Len(); i++) {
2430  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2431  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2432  while (RowI < EndI) {
2433  TInt RowId = RowI.GetRowIdx(); // EdgeId
2434  TInt SrcId = RowI.GetIntAttr(SrcColIdx);
2435  TInt DstId = RowI.GetIntAttr(DstColIdx);
2436  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
2437  RowI++;
2438  }
2439  }
2440  }
2441  else if (NodeType == atStr) {
2442  #pragma omp parallel for schedule(static)
2443  for (int i = 0; i < Partitions.Len(); i++) {
2444  TRowIterator RowI(Partitions[i].GetVal1(), Table());
2445  TRowIterator EndI(Partitions[i].GetVal2(), Table());
2446  while (RowI < EndI) {
2447  TInt RowId = RowI.GetRowIdx(); // EdgeId
2448  TInt SrcId = RowI.GetStrMapById(SrcColIdx);
2449  TInt DstId = RowI.GetStrMapById(DstColIdx);
2450  Graph->AddEdgeUnchecked(RowId, SrcId, DstId);
2451  RowI++;
2452  }
2453  }
2454 
2455  }
2456 
2457  Graph->SetEdges(NumRows);
2458  Graph->SetMxEId(NumRows);
2460 
2461  // make single pass over all rows in the table to add attributes
2462  for (int CurrRowIdx = 0; CurrRowIdx < (Table->Next).Len(); CurrRowIdx++) {
2463  if ((Table->Next)[CurrRowIdx] == Table->Invalid) {
2464  continue;
2465  }
2466  for (TInt ea_i = 0; ea_i < EdgeAttrV.Len(); ea_i++) {
2467  TStr ColName = EdgeAttrV[ea_i];
2468  TAttrType T = Table->GetColType(ColName);
2469  TInt Index = Table->GetColIdx(ColName);
2470  switch (T) {
2471  case atInt:
2472  Graph->AddIntAttrDatE(CurrRowIdx, Table->IntCols[Index][CurrRowIdx], ColName);
2473  break;
2474  case atFlt:
2475  Graph->AddFltAttrDatE(CurrRowIdx, Table->FltCols[Index][CurrRowIdx], ColName);
2476  break;
2477  case atStr:
2478  Graph->AddStrAttrDatE(CurrRowIdx, Table->GetStrValIdx(Index, CurrRowIdx), ColName);
2479  break;
2480  }
2481  }
2482  }
2483 
2484  // Add node attribtes
2485  if (NodeAttrV.Len() > 0) {
2486  for (int CurrRowIdx = 0; CurrRowIdx < (NodeTable->Next).Len(); CurrRowIdx++) {
2487  if ((NodeTable->Next)[CurrRowIdx] == NodeTable->Invalid) {
2488  continue;
2489  }
2490  TInt NId;
2491  if (NodeTypeN == atInt) {
2492  NId = (NodeTable->IntCols)[NodeColIdx][CurrRowIdx];
2493  }
2494  else if (NodeTypeN == atStr){
2495  NId = (NodeTable->StrColMaps)[NodeColIdx][CurrRowIdx];
2496  }
2497  for (TInt i = 0; i < NodeAttrV.Len(); i++) {
2498  TStr ColName = NodeAttrV[i];
2499  TAttrType T = NodeTable->GetColType(ColName);
2500  TInt Index = NodeTable->GetColIdx(ColName);
2501  switch (T) {
2502  case atInt:
2503  Graph->AddIntAttrDatN(NId, NodeTable->IntCols[Index][CurrRowIdx], ColName);
2504  break;
2505  case atFlt:
2506  Graph->AddFltAttrDatN(NId, NodeTable->FltCols[Index][CurrRowIdx], ColName);
2507  break;
2508  case atStr:
2509  Graph->AddStrAttrDatN(NId, NodeTable->GetStrValIdx(Index, CurrRowIdx), ColName);
2510  break;
2511  }
2512  }
2513  }
2514  }
2515  // double endAdd = omp_get_wtime();
2516  // printf("Add time = %f\n", endAdd-endAlloc);
2517 
2518  return Graph;
2519 }
2520 #endif // GCC_ATOMIC
2521 
2522 }; // TSnap namespace
2523 
2524 #endif // CONV_H
2525 
int GetPrimHashCd() const
Definition: dt.h:1171
TPair< TInt, TInt > TIntPr
Definition: ds.h:83
Main namespace for all the Snap global entities.
Definition: alg.h:1
enum TAttrType_ TAttrType
Types for tables, sparse and dense attributes.
int LoadModeNetToNet(PMMNet Graph, const TStr &Name, PTable Table, const TStr &NCol, TStrV &NodeAttrV)
Loads a mode, with name Name, into the PMMNet from the TTable. NCol specifies the node id column and ...
Definition: conv.cpp:6
TInt GetIntAttr(TInt ColIdx) const
Returns value of integer attribute specified by integer column index for current row.
Definition: table.cpp:155
int Val
Definition: dt.h:1139
PGraphMP ToGraphMP(PTable Table, const TStr &SrcCol, const TStr &DstCol)
Performs table to graph conversion in parallel using the sort-first algorithm. This is the recommende...
Definition: conv.h:192
unsigned int uint
Definition: bd.h:11
TIter BegI() const
Definition: hash.h:213
TSizeTy Len() const
Returns the number of elements in the vector.
Definition: ds.h:575
static PNGraphMP New()
Static constructor that returns a pointer to the graph. Call: PNGraphMP Graph = TNGraphMP::New().
Definition: graphmp.h:141
void Start(const TExperiment Exp)
Start a new experiment.
Definition: util.cpp:733
TAttrAggr
Possible policies for aggregating node attributes.
Definition: table.h:257
const TDat & GetDat(const TKey &Key) const
Definition: hash.h:262
Node iterator. Only forward iteration (operator++) is supported.
Definition: network.h:1792
TIter EndI() const
Definition: hash.h:218
void Clr()
Definition: bd.h:502
void CopyUniqueFrom(TVec< TVal, TSizeTy > &Vec, TInt Offset, TInt Sz)
Copy Sz values from Vec starting at Offset.
Definition: ds.h:1082
Definition: gbase.h:23
Definition: dt.h:1386
void Stop(const TExperiment Exp)
Stop the current experiment.
Definition: util.cpp:737
Iterator class for TTable rows.
Definition: table.h:330
const TVal & GetVal(const TSizeTy &ValN) const
Returns a reference to the element at position ValN in the vector.
Definition: ds.h:649
int LoadCrossNetToNet(PMMNet Graph, const TStr &Mode1, const TStr &Mode2, const TStr &CrossName, PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &EdgeAttrV)
Loads a crossnet from Mode1 to Mode2, with name CrossName, from the provided TTable. EdgeAttrV specifies edge attributes.
Definition: conv.cpp:60
void Clr(const bool &DoDel=true, const TSizeTy &NoDelLim=-1)
Clears the contents of the vector.
Definition: ds.h:1022
PGraph ToGraph(PTable Table, const TStr &SrcCol, const TStr &DstCol, TAttrAggr AggrPolicy)
Sequentially converts the table into a graph with links from nodes in SrcCol to those in DstCol...
Definition: conv.h:8
int LoadCrossNet(TCrossNet &Graph, PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &EdgeAttrV)
Loads the edges from the TTable and EdgeAttrV specifies columns containing edge attributes.
Definition: conv.cpp:69
PGraphMP ToNetworkMP(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Does Table to Network conversion in parallel using the sort-first algorithm. This is the recommended ...
Definition: conv.h:696
#define Assert(Cond)
Definition: bd.h:251
PGraph ToNetwork(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Converts the Table into a graph with edges from SrcCol to DstCol, and attribute vector defined by the...
Definition: conv.h:64
int LoadMode(TModeNet &Graph, PTable Table, const TStr &NCol, TStrV &NodeAttrV)
Loads the nodes specified in column NCol from the TTable with the attributes specified in NodeAttrV...
Definition: conv.cpp:14
TInt GetRowIdx() const
Gets the id of the row pointed by this iterator.
Definition: table.cpp:151
TInt GetStrMapById(TInt ColIdx) const
Returns integer mapping of a string attribute value specified by string column index for current row...
Definition: table.cpp:186
static void QSortKeyVal(TIntV &Key, TIntV &Val, TInt Start, TInt End)
Definition: table.cpp:5378
The nodes of one particular mode in a TMMNet, and their neighbor vectors as TIntV attributes...
Definition: mmnet.h:23
Definition: dt.h:1137
PGraphMP ToGraphMP3(PTable Table, const TStr &SrcCol, const TStr &DstCol)
Performs table to graph conversion in parallel. Uses the hash-first method, which is less optimal...
Definition: conv.h:532
Definition: dt.h:412
Definition: hash.h:97
Definition: gbase.h:23
Definition: bd.h:196
TTriple< TInt, TInt, TInt > TIntTr
Definition: ds.h:171
void Gen(const TSizeTy &_Vals)
Constructs a vector (an array) of _Vals elements.
Definition: ds.h:523
void Reserve(const TSizeTy &_MxVals)
Reserves enough memory for the vector to store _MxVals elements.
Definition: ds.h:543
static TStopwatch * GetInstance()
Definition: util.h:82
Definition: gbase.h:23
bool IsKey(const TKey &Key) const
Definition: hash.h:258
TSizeTy Add()
Adds a new element at the end of the vector, after its current last element.
Definition: ds.h:602
PGraphMP ToNetworkMP2(PTable Table, const TStr &SrcCol, const TStr &DstCol, TStrV &SrcAttrV, TStrV &DstAttrV, TStrV &EdgeAttrV, TAttrAggr AggrPolicy)
Implements table to network conversion in parallel. Not the recommended algorithm, using ToNetworkMP instead.
Definition: conv.h:1118
Implements a single CrossNet consisting of edges between two TModeNets (could be the same TModeNet) ...
Definition: mmnet.h:133
Routines to benchmark table operations.
Definition: util.h:71