00001 typedef struct dblib_buffer_row {
00003 TDSRESULTINFO *resinfo;
00005 unsigned char *row_data;
00007 DBINT row;
00009 TDS_INT *sizes;
00010 } DBLIB_BUFFER_ROW;
00011
00012 static void buffer_struct_print(const DBPROC_ROWBUF *buf);
00013 static int buffer_save_row(DBPROCESS *dbproc);
00014 static DBLIB_BUFFER_ROW* buffer_row_address(const DBPROC_ROWBUF * buf, int idx);
00015
00045 static int
00046 buffer_count(const DBPROC_ROWBUF *buf)
00047 {
00048 return (buf->head > buf->tail) ?
00049 buf->head - buf->tail :
00050 buf->capacity - (buf->tail - buf->head);
00051 }
00052
00056 static int
00057 buffer_is_full(const DBPROC_ROWBUF *buf)
00058 {
00059 return buf->capacity == buffer_count(buf) && buf->capacity > 1;
00060 }
00061
00062 #ifndef NDEBUG
00063 static int
00064 buffer_index_valid(const DBPROC_ROWBUF *buf, int idx)
00065 {
00066 if (buf->tail <= buf->head)
00067 if (buf->head <= idx && idx <= buf->tail)
00068 return 1;
00069
00070 if (0 <= idx && idx <= buf->head)
00071 return 1;
00072
00073 if (buf->tail <= idx && idx < buf->capacity)
00074 return 1;
00075 #if 0
00076 printf("buffer_index_valid: idx = %d\n", idx);
00077 buffer_struct_print(buf);
00078 #endif
00079 return 0;
00080 }
00081 #endif
00082
00083 static void
00084 buffer_free_row(DBLIB_BUFFER_ROW *row)
00085 {
00086 if (row->sizes)
00087 TDS_ZERO_FREE(row->sizes);
00088 if (row->row_data) {
00089 tds_free_row(row->resinfo, row->row_data);
00090 row->row_data = NULL;
00091 }
00092 tds_free_results(row->resinfo);
00093 row->resinfo = NULL;
00094 }
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106 static void
00107 buffer_free(DBPROC_ROWBUF *buf)
00108 {
00109 if (buf->rows != NULL) {
00110 int i;
00111 for (i = 0; i < buf->capacity; ++i)
00112 buffer_free_row(&buf->rows[i]);
00113 TDS_ZERO_FREE(buf->rows);
00114 }
00115 }
00116
00117
00118
00119
00120
00121 static void
00122 buffer_reset(DBPROC_ROWBUF *buf)
00123 {
00124 buf->head = 0;
00125 buf->current = buf->tail = buf->capacity;
00126 }
00127
00128 static int
00129 buffer_idx_increment(const DBPROC_ROWBUF *buf, int idx)
00130 {
00131 if (++idx >= buf->capacity) {
00132 idx = 0;
00133 }
00134 return idx;
00135 }
00136
00141 static DBLIB_BUFFER_ROW*
00142 buffer_row_address(const DBPROC_ROWBUF * buf, int idx)
00143 {
00144 if (!(idx >= 0 && idx < buf->capacity)) {
00145 printf("idx is %d:\n", idx);
00146 buffer_struct_print(buf);
00147 assert(idx >= 0);
00148 assert(idx < buf->capacity);
00149 }
00150
00151 return &(buf->rows[idx]);
00152 }
00153
00157 static DBINT
00158 buffer_idx2row(const DBPROC_ROWBUF *buf, int idx)
00159 {
00160 return buffer_row_address(buf, idx)->row;
00161 }
00162
00166 static int
00167 buffer_row2idx(const DBPROC_ROWBUF *buf, int row_number)
00168 {
00169 int i, ii, idx = -1;
00170
00171 if (buf->tail == buf->capacity) {
00172 assert (buf->head == 0);
00173 return -1;
00174 }
00175
00176
00177
00178
00179
00180 for (ii=0, i = buf->tail; i != buf->head || ii == 0; i = buffer_idx_increment(buf, i)) {
00181 if( buffer_idx2row(buf, i) == row_number) {
00182 idx = i;
00183 break;
00184 }
00185 assert(ii++ < buf->capacity);
00186 }
00187
00188 return idx;
00189 }
00190
00196 static void
00197 buffer_delete_rows(DBPROC_ROWBUF * buf, int count)
00198 {
00199 int i;
00200
00201 if (count < 0 || count > buffer_count(buf)) {
00202 count = buffer_count(buf);
00203 }
00204
00205 for (i=0; i < count; i++) {
00206 if (buf->tail < buf->capacity)
00207 buffer_free_row(&buf->rows[i]);
00208 buf->tail = buffer_idx_increment(buf, buf->tail);
00209
00210
00211
00212
00213 if (buf->tail == buf->head) {
00214 buffer_reset(buf);
00215 break;
00216 }
00217 }
00218 #if 0
00219 buffer_struct_print(buf);
00220 #endif
00221 }
00222
00223 static void
00224 buffer_transfer_bound_data(DBPROC_ROWBUF *buf, TDS_INT res_type, TDS_INT compute_id, DBPROCESS * dbproc, int idx)
00225 {
00226 int i;
00227 int srctype, desttype;
00228 BYTE *src;
00229 const DBLIB_BUFFER_ROW *row;
00230
00231 tdsdump_log(TDS_DBG_FUNC, "buffer_transfer_bound_data(%p %d %d %p %d)\n", buf, res_type, compute_id, dbproc, idx);
00232 assert(buffer_index_valid(buf, idx));
00233
00234 row = buffer_row_address(buf, idx);
00235 assert(row->resinfo);
00236
00237 for (i = 0; i < row->resinfo->num_cols; i++) {
00238 DBINT srclen;
00239 TDSCOLUMN *curcol = row->resinfo->columns[i];
00240
00241 if (row->sizes)
00242 curcol->column_cur_size = row->sizes[i];
00243
00244 if (curcol->column_nullbind) {
00245 if (curcol->column_cur_size < 0) {
00246 *(DBINT *)(curcol->column_nullbind) = -1;
00247 } else {
00248 *(DBINT *)(curcol->column_nullbind) = 0;
00249 }
00250 }
00251 if (!curcol->column_varaddr)
00252 continue;
00253
00254 if (row->row_data)
00255 src = &row->row_data[curcol->column_data - row->resinfo->current_row];
00256 else
00257 src = curcol->column_data;
00258 srclen = curcol->column_cur_size;
00259 if (is_blob_type(curcol->column_type)) {
00260 src = (BYTE *) ((TDSBLOB *) src)->textvalue;
00261 }
00262 desttype = _db_get_server_type(curcol->column_bindtype);
00263 srctype = tds_get_conversion_type(curcol->column_type, curcol->column_size);
00264
00265 if (srclen <= 0) {
00266 if (srclen == 0 || !curcol->column_nullbind)
00267 dbgetnull(dbproc, curcol->column_bindtype, curcol->column_bindlen,
00268 (BYTE *) curcol->column_varaddr);
00269 } else {
00270 copy_data_to_host_var(dbproc, srctype, src, srclen, desttype,
00271 (BYTE *) curcol->column_varaddr, curcol->column_bindlen,
00272 curcol->column_bindtype, curcol->column_nullbind);
00273 }
00274 }
00275
00276
00277
00278
00279
00280
00281
00282
00283 buf->current = buffer_idx_increment(buf, buf->current);
00284
00285 }
00286
00287 static void
00288 buffer_struct_print(const DBPROC_ROWBUF *buf)
00289 {
00290 assert(buf);
00291
00292 printf("\t%d rows in buffer\n", buffer_count(buf));
00293
00294 printf("\thead = %d\t", buf->head);
00295 printf("\ttail = %d\t", buf->tail);
00296 printf("\tcurrent = %d\n", buf->current);
00297 printf("\tcapacity = %d\t", buf->capacity);
00298 printf("\thead row number = %d\n", buf->received);
00299 }
00300
00301
00302
00319 static int
00320 buffer_current_index(const DBPROCESS *dbproc)
00321 {
00322 const DBPROC_ROWBUF *buf = &dbproc->row_buf;
00323 #if 0
00324 buffer_struct_print(buf);
00325 #endif
00326 if (buf->capacity <= 1)
00327 return -1;
00328 if (buf->current == buf->head || buf->current == buf->capacity)
00329 return -1;
00330
00331 assert(buf->current >= 0);
00332 assert(buf->current < buf->capacity);
00333
00334 if( buf->tail < buf->head) {
00335 assert(buf->tail < buf->current);
00336 assert(buf->current < buf->head);
00337 } else {
00338 if (buf->current > buf->head)
00339 assert(buf->current > buf->tail);
00340 }
00341 return buf->current;
00342 }
00343
00344
00345
00346
00347
00348 static void
00349 buffer_set_capacity(DBPROCESS *dbproc, int nrows)
00350 {
00351 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00352
00353 buffer_free(buf);
00354
00355 memset(buf, 0, sizeof(DBPROC_ROWBUF));
00356
00357 if (0 == nrows) {
00358 buf->capacity = 1;
00359 return;
00360 }
00361
00362 assert(0 < nrows);
00363
00364 buf->capacity = nrows;
00365 }
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375 static void
00376 buffer_alloc(DBPROCESS *dbproc)
00377 {
00378 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00379
00380
00381
00382 assert(buf);
00383 assert(buf->capacity > 0);
00384 assert(buf->rows == NULL);
00385
00386 buf->rows = (DBLIB_BUFFER_ROW *) calloc(buf->capacity, sizeof(DBLIB_BUFFER_ROW));
00387
00388 assert(buf->rows);
00389
00390 buffer_reset(buf);
00391
00392 buf->received = 0;
00393 }
00394
00399 static int
00400 buffer_add_row(DBPROCESS *dbproc, TDSRESULTINFO *resinfo)
00401 {
00402 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00403 DBLIB_BUFFER_ROW *row;
00404 int i;
00405
00406 assert(buf->capacity >= 0);
00407
00408 if (buffer_is_full(buf))
00409 return -1;
00410
00411
00412 if (buf->tail == buf->capacity) {
00413
00414 assert(buf->head == 0);
00415 buf->tail = buffer_idx_increment(buf, buf->tail);
00416 }
00417
00418 row = buffer_row_address(buf, buf->head);
00419
00420
00421 if (row->resinfo) {
00422 tds_free_row(row->resinfo, row->row_data);
00423 tds_free_results(row->resinfo);
00424 }
00425 row->row = ++buf->received;
00426 ++resinfo->ref_count;
00427 row->resinfo = resinfo;
00428 row->row_data = NULL;
00429 if (row->sizes)
00430 free(row->sizes);
00431 row->sizes = (TDS_INT *) calloc(resinfo->num_cols, sizeof(TDS_INT));
00432 for (i = 0; i < resinfo->num_cols; ++i)
00433 row->sizes[i] = resinfo->columns[i]->column_cur_size;
00434
00435
00436 buf->current = buf->head;
00437 buf->head = buffer_idx_increment(buf, buf->head);
00438
00439 return buf->current;
00440 }
00441
00442 static int
00443 buffer_save_row(DBPROCESS *dbproc)
00444 {
00445 DBPROC_ROWBUF *buf = &dbproc->row_buf;
00446 DBLIB_BUFFER_ROW *row;
00447 int idx = buf->head - 1;
00448
00449 if (buf->capacity <= 1)
00450 return SUCCEED;
00451
00452 if (idx < 0)
00453 idx = buf->capacity - 1;
00454 if (idx >= 0 && idx < buf->capacity) {
00455 row = &buf->rows[idx];
00456
00457 if (row->resinfo && !row->row_data) {
00458 row->row_data = row->resinfo->current_row;
00459 tds_alloc_row(row->resinfo);
00460 }
00461 }
00462
00463 return SUCCEED;
00464 }
00465