00001 typedef struct dblib_buffer_row {
00003 TDSRESULTINFO *resinfo;
00005 unsigned char *row_data;
00007 DBINT row;
00009 TDS_INT *sizes;
00010 } DBLIB_BUFFER_ROW;
00011
00012 static void buffer_struct_print(const DBPROC_ROWBUF *buf);
00013 static int buffer_save_row(DBPROCESS *dbproc);
00014 static DBLIB_BUFFER_ROW* buffer_row_address(const DBPROC_ROWBUF * buf, int idx);
00015
00045 static int
00046 buffer_count(const DBPROC_ROWBUF *buf)
00047 {
00048 return (buf->head > buf->tail) ?
00049 buf->head - buf->tail :
00050 buf->capacity - (buf->tail - buf->head);
00051 }
00052
00056 static int
00057 buffer_is_full(const DBPROC_ROWBUF *buf)
00058 {
00059 return buf->capacity == buffer_count(buf) && buf->capacity > 1;
00060 }
00061
00062 static int
00063 buffer_index_valid(const DBPROC_ROWBUF *buf, int idx)
00064 {
00065 if (buf->tail <= buf->head)
00066 if (buf->head <= idx && idx <= buf->tail)
00067 return 1;
00068
00069 if (0 <= idx && idx <= buf->head)
00070 return 1;
00071
00072 if (buf->tail <= idx && idx < buf->capacity)
00073 return 1;
00074 #if 0
00075 printf("buffer_index_valid: idx = %d\n", idx);
00076 buffer_struct_print(buf);
00077 #endif
00078 return 0;
00079 }
00080
00081 static void
00082 buffer_free_row(DBLIB_BUFFER_ROW *row)
00083 {
00084 if (row->sizes)
00085 TDS_ZERO_FREE(row->sizes);
00086 if (row->row_data) {
00087 tds_free_row(row->resinfo, row->row_data);
00088 row->row_data = NULL;
00089 }
00090 tds_free_results(row->resinfo);
00091 row->resinfo = NULL;
00092 }
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104 static void
00105 buffer_free(DBPROC_ROWBUF *buf)
00106 {
00107 if (buf->rows != NULL) {
00108 int i;
00109 for (i = 0; i < buf->capacity; ++i)
00110 buffer_free_row(&buf->rows[i]);
00111 TDS_ZERO_FREE(buf->rows);
00112 }
00113 }
00114
00115
00116
00117
00118
00119 static void
00120 buffer_reset(DBPROC_ROWBUF *buf)
00121 {
00122 buf->head = 0;
00123 buf->current = buf->tail = buf->capacity;
00124 }
00125
00126 static int
00127 buffer_idx_increment(const DBPROC_ROWBUF *buf, int idx)
00128 {
00129 if (++idx >= buf->capacity) {
00130 idx = 0;
00131 }
00132 return idx;
00133 }
00134
00139 static DBLIB_BUFFER_ROW*
00140 buffer_row_address(const DBPROC_ROWBUF * buf, int idx)
00141 {
00142 if (!(idx >= 0 && idx < buf->capacity)) {
00143 printf("idx is %d:\n", idx);
00144 buffer_struct_print(buf);
00145 assert(idx >= 0);
00146 assert(idx < buf->capacity);
00147 }
00148
00149 return &(buf->rows[idx]);
00150 }
00151
00155 static DBINT
00156 buffer_idx2row(const DBPROC_ROWBUF *buf, int idx)
00157 {
00158 return buffer_row_address(buf, idx)->row;
00159 }
00160
00164 static int
00165 buffer_row2idx(const DBPROC_ROWBUF *buf, int row_number)
00166 {
00167 int i, ii, idx = -1;
00168
00169 if (buf->tail == buf->capacity) {
00170 assert (buf->head == 0);
00171 return -1;
00172 }
00173
00174
00175
00176
00177
00178 for (ii=0, i = buf->tail; i != buf->head || ii == 0; i = buffer_idx_increment(buf, i)) {
00179 if( buffer_idx2row(buf, i) == row_number) {
00180 idx = i;
00181 break;
00182 }
00183 assert(ii++ < buf->capacity);
00184 }
00185
00186 return idx;
00187 }
00188
00194 static void
00195 buffer_delete_rows(DBPROC_ROWBUF * buf, int count)
00196 {
00197 int i;
00198
00199 if (count < 0 || count > buffer_count(buf)) {
00200 count = buffer_count(buf);
00201 }
00202
00203 for (i=0; i < count; i++) {
00204 if (buf->tail < buf->capacity)
00205 buffer_free_row(&buf->rows[i]);
00206 buf->tail = buffer_idx_increment(buf, buf->tail);
00207
00208
00209
00210
00211 if (buf->tail == buf->head) {
00212 buffer_reset(buf);
00213 break;
00214 }
00215 }
00216 #if 0
00217 buffer_struct_print(buf);
00218 #endif
00219 }
00220
00221 static void
00222 buffer_transfer_bound_data(DBPROC_ROWBUF *buf, TDS_INT res_type, TDS_INT compute_id, DBPROCESS * dbproc, int idx)
00223 {
00224
00225 int i;
00226 TDSCOLUMN *curcol;
00227 TDSRESULTINFO *resinfo;
00228 int srctype;
00229 BYTE *src;
00230 int desttype;
00231 const DBLIB_BUFFER_ROW *row;
00232
00233 assert(buffer_index_valid(buf, idx));
00234
00235 row = buffer_row_address(buf, idx);
00236 assert(row->resinfo);
00237 resinfo = row->resinfo;
00238
00239 for (i = 0; i < resinfo->num_cols; i++) {
00240 DBINT srclen;
00241
00242 curcol = resinfo->columns[i];
00243 if (row->sizes)
00244 curcol->column_cur_size = row->sizes[i];
00245
00246 if (curcol->column_nullbind) {
00247 if (curcol->column_cur_size < 0) {
00248 *(DBINT *)(curcol->column_nullbind) = -1;
00249 } else {
00250 *(DBINT *)(curcol->column_nullbind) = 0;
00251 }
00252 }
00253 if (!curcol->column_varaddr)
00254 continue;
00255
00256 src = (row->row_data ? row->row_data : row->resinfo->current_row) + curcol->column_offset;
00257 srclen = curcol->column_cur_size;
00258 if (is_blob_type(curcol->column_type)) {
00259 src = (BYTE *) ((TDSBLOB *) src)->textvalue;
00260 }
00261 desttype = _db_get_server_type(curcol->column_bindtype);
00262 srctype = tds_get_conversion_type(curcol->column_type, curcol->column_size);
00263
00264 if (srclen < 0) {
00265 _set_null_value((BYTE *) curcol->column_varaddr, desttype, curcol->column_bindlen);
00266 } else {
00267 copy_data_to_host_var(dbproc, srctype, src, srclen, desttype,
00268 (BYTE *) curcol->column_varaddr, curcol->column_bindlen,
00269 curcol->column_bindtype, curcol->column_nullbind);
00270 }
00271 }
00272
00273
00274
00275
00276
00277
00278
00279
00280 buf->current = buffer_idx_increment(buf, buf->current);
00281
00282 }
00283
00284 static void
00285 buffer_struct_print(const DBPROC_ROWBUF *buf)
00286 {
00287 assert(buf);
00288
00289 printf("\t%d rows in buffer\n", buffer_count(buf));
00290
00291 printf("\thead = %d\t", buf->head);
00292 printf("\ttail = %d\t", buf->tail);
00293 printf("\tcurrent = %d\n", buf->current);
00294 printf("\tcapacity = %d\t", buf->capacity);
00295 printf("\thead row number = %d\n", buf->received);
00296 }
00297
00298
00299
00316 static int
00317 buffer_current_index(const DBPROCESS *dbproc)
00318 {
00319 const DBPROC_ROWBUF *buf = &dbproc->row_buf;
00320 #if 0
00321 buffer_struct_print(buf);
00322 #endif
00323 if (buf->capacity <= 1)
00324 return -1;
00325 if (buf->current == buf->head || buf->current == buf->capacity)
00326 return -1;
00327
00328 assert(buf->current >= 0);
00329 assert(buf->current < buf->capacity);
00330
00331 if( buf->tail < buf->head) {
00332 assert(buf->tail < buf->current);
00333 assert(buf->current < buf->head);
00334 } else {
00335 if (buf->current > buf->head)
00336 assert(buf->current > buf->tail);
00337 }
00338 return buf->current;
00339 }
00340
00341
00342
00343
00344
00345 static void
00346 buffer_set_capacity(DBPROCESS *dbproc, int nrows)
00347 {
00348 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00349
00350 buffer_free(buf);
00351
00352 memset(buf, 0, sizeof(DBPROC_ROWBUF));
00353
00354 if (0 == nrows) {
00355 buf->capacity = 1;
00356 return;
00357 }
00358
00359 assert(0 < nrows);
00360
00361 buf->capacity = nrows;
00362 }
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372 static void
00373 buffer_alloc(DBPROCESS *dbproc)
00374 {
00375 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00376
00377
00378
00379 assert(buf);
00380 assert(buf->capacity > 0);
00381 assert(buf->rows == NULL);
00382
00383 buf->rows = (DBLIB_BUFFER_ROW *) calloc(buf->capacity, sizeof(DBLIB_BUFFER_ROW));
00384
00385 assert(buf->rows);
00386
00387 buffer_reset(buf);
00388
00389 buf->received = 0;
00390 }
00391
00396 static int
00397 buffer_add_row(DBPROCESS *dbproc, TDSRESULTINFO *resinfo)
00398 {
00399 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00400 DBLIB_BUFFER_ROW *row;
00401 int i;
00402
00403 assert(buf->capacity >= 0);
00404
00405 if (buffer_is_full(buf))
00406 return -1;
00407
00408
00409 if (buf->tail == buf->capacity) {
00410
00411 assert(buf->head == 0);
00412 buf->tail = buffer_idx_increment(buf, buf->tail);
00413 }
00414
00415 row = buffer_row_address(buf, buf->head);
00416
00417
00418 if (row->resinfo) {
00419 tds_free_row(row->resinfo, row->row_data);
00420 tds_free_results(row->resinfo);
00421 }
00422 row->row = ++buf->received;
00423 ++resinfo->ref_count;
00424 row->resinfo = resinfo;
00425 row->row_data = NULL;
00426 if (row->sizes)
00427 free(row->sizes);
00428 row->sizes = (TDS_INT *) calloc(resinfo->num_cols, sizeof(TDS_INT));
00429 for (i = 0; i < resinfo->num_cols; ++i)
00430 row->sizes[i] = resinfo->columns[i]->column_cur_size;
00431
00432
00433 buf->current = buf->head;
00434 buf->head = buffer_idx_increment(buf, buf->head);
00435
00436 return buf->current;
00437 }
00438
00439 static int
00440 buffer_save_row(DBPROCESS *dbproc)
00441 {
00442 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00443 DBLIB_BUFFER_ROW *row;
00444 int idx = buf->head - 1;
00445
00446 if (buf->capacity <= 1)
00447 return SUCCEED;
00448
00449 if (idx < 0)
00450 idx = buf->capacity - 1;
00451 if (idx >= 0 && idx < buf->capacity) {
00452 row = &buf->rows[idx];
00453
00454 if (row->resinfo && !row->row_data) {
00455 row->row_data = row->resinfo->current_row;
00456 row->resinfo->current_row = tds_alloc_row(row->resinfo);
00457 }
00458 }
00459
00460 return SUCCEED;
00461 }
00462